This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 06539e8 [TUBEMQ-578] Build implementation classes based on BDB storage
06539e8 is described below
commit 06539e8582fdea782039943b087804f87409e12a
Author: gosonzhang <[email protected]>
AuthorDate: Fri Mar 19 10:18:01 2021 +0800
[TUBEMQ-578] Build implementation classes based on BDB storage
---
.../master/bdbstore/DefaultBdbStoreService.java | 28 ++---
.../dao/entity/GroupFilterCtrlEntity.java | 2 +-
.../metastore/dao/mapper/ClusterConfigMapper.java | 2 +-
.../metastore/dao/mapper/GroupBlackListMapper.java | 5 +-
.../metastore/dao/mapper/GroupConfigMapper.java | 2 +-
.../dao/mapper/GroupFilterCtrlMapper.java | 2 +-
.../metastore/dao/mapper/TopicConfigMapper.java | 4 +-
.../metastore/dao/mapper/TopicCtrlMapper.java | 2 +-
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 136 ++++++++++++++++++++
.../impl/bdbimpl/BdbClusterConfigMapperImpl.java | 132 ++++++++++++++++++++
.../impl/bdbimpl/BdbGroupBlackListMapperImpl.java | 137 +++++++++++++++++++++
.../impl/bdbimpl/BdbGroupConfigMapperImpl.java | 137 +++++++++++++++++++++
.../impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java | 137 +++++++++++++++++++++
.../impl/bdbimpl/BdbTopicConfigMapperImpl.java | 137 +++++++++++++++++++++
.../impl/bdbimpl/BdbTopicCtrlMapperImpl.java | 137 +++++++++++++++++++++
.../metastore/impl/bdbimpl/TBDBStoreTables.java | 33 +++++
16 files changed, 1005 insertions(+), 28 deletions(-)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index b4a86cd..e14ff2b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -68,6 +68,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntit
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
import org.apache.tubemq.server.master.metastore.TStoreConstants;
+import org.apache.tubemq.server.master.metastore.impl.bdbimpl.TBDBStoreTables;
import org.apache.tubemq.server.master.utils.BdbStoreSamplePrint;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -83,15 +84,6 @@ import org.slf4j.LoggerFactory;
public class DefaultBdbStoreService implements BdbStoreService, Server {
private static final Logger logger =
LoggerFactory.getLogger(DefaultBdbStoreService.class);
- private static final String BDB_CLUSTER_SETTING_STORE_NAME =
"bdbClusterSetting";
- private static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
- private static final String BDB_BROKER_CONFIG_STORE_NAME =
"bdbBrokerConfig";
- private static final String BDB_CONSUMER_GROUP_STORE_NAME =
"bdbConsumerGroup";
- private static final String BDB_TOPIC_AUTH_CONTROL_STORE_NAME =
"bdbTopicAuthControl";
- private static final String BDB_BLACK_GROUP_STORE_NAME = "bdbBlackGroup";
- private static final String BDB_GROUP_FILTER_COND_STORE_NAME =
"bdbGroupFilterCond";
- private static final String BDB_GROUP_FLOW_CONTROL_STORE_NAME =
"bdbGroupFlowCtrlCfg";
- private static final String BDB_CONSUME_GROUP_SETTING_STORE_NAME =
"bdbConsumeGroupSetting";
private static final int REP_HANDLE_RETRY_MAX = 1;
private final TMaster tMaster;
// simple log print
@@ -1001,39 +993,39 @@ public class DefaultBdbStoreService implements
BdbStoreService, Server {
/* initial metadata */
private void initMetaStore() {
brokerConfStore =
- new EntityStore(repEnv, BDB_BROKER_CONFIG_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_BROKER_CONFIG_STORE_NAME, storeConfig);
brokerConfIndex =
brokerConfStore.getPrimaryIndex(Integer.class,
BdbBrokerConfEntity.class);
topicConfStore =
- new EntityStore(repEnv, BDB_TOPIC_CONFIG_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
topicConfIndex =
topicConfStore.getPrimaryIndex(String.class,
BdbTopicConfEntity.class);
consumerGroupStore =
- new EntityStore(repEnv, BDB_CONSUMER_GROUP_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_CONSUMER_GROUP_STORE_NAME, storeConfig);
consumerGroupIndex =
consumerGroupStore.getPrimaryIndex(String.class,
BdbConsumerGroupEntity.class);
topicAuthControlStore =
- new EntityStore(repEnv, BDB_TOPIC_AUTH_CONTROL_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_TOPIC_AUTH_CONTROL_STORE_NAME, storeConfig);
topicAuthControlIndex =
topicAuthControlStore.getPrimaryIndex(String.class,
BdbTopicAuthControlEntity.class);
blackGroupStore =
- new EntityStore(repEnv, BDB_BLACK_GROUP_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_BLACK_GROUP_STORE_NAME, storeConfig);
blackGroupIndex =
blackGroupStore.getPrimaryIndex(String.class,
BdbBlackGroupEntity.class);
groupFilterCondStore =
- new EntityStore(repEnv, BDB_GROUP_FILTER_COND_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_GROUP_FILTER_COND_STORE_NAME, storeConfig);
groupFilterCondIndex =
groupFilterCondStore.getPrimaryIndex(String.class,
BdbGroupFilterCondEntity.class);
groupFlowCtrlStore =
- new EntityStore(repEnv, BDB_GROUP_FLOW_CONTROL_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_GROUP_FLOW_CONTROL_STORE_NAME, storeConfig);
groupFlowCtrlIndex =
groupFlowCtrlStore.getPrimaryIndex(String.class,
BdbGroupFlowCtrlEntity.class);
consumeGroupSettingStore =
- new EntityStore(repEnv, BDB_CONSUME_GROUP_SETTING_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_CONSUME_GROUP_SETTING_STORE_NAME, storeConfig);
consumeGroupSettingIndex =
consumeGroupSettingStore.getPrimaryIndex(String.class,
BdbConsumeGroupSettingEntity.class);
clusterDefSettingStore =
- new EntityStore(repEnv, BDB_CLUSTER_SETTING_STORE_NAME,
storeConfig);
+ new EntityStore(repEnv,
TBDBStoreTables.BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
clusterDefSettingIndex =
clusterDefSettingStore.getPrimaryIndex(String.class,
BdbClusterSettingEntity.class);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
index 59966bd..f81c7c4 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
@@ -64,7 +64,7 @@ public class GroupFilterCtrlEntity extends BaseEntity {
this.filterConsumeStatus = EnableStatus.STATUS_DISABLE;
}
this.filterCondStr = bdbEntity.getFilterCondStr();
- this.setAttributes(bdbEntity.getFilterCondStr());
+ this.setAttributes(bdbEntity.getAttributes());
}
public BdbGroupFilterCondEntity buildBdbGroupFilterCondEntity() {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
index 3fcd45d..6fcb5e4 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
@@ -23,7 +23,7 @@ import
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity
public interface ClusterConfigMapper extends AbstractMapper {
- boolean putClusterConfig(ClusterSettingEntity entity, ProcessResult
result);
+ boolean putClusterConfig(ClusterSettingEntity memEntity, ProcessResult
result);
boolean delClusterConfig(String key);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
index fed52e6..e9f49c2 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
@@ -18,13 +18,12 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
-
+import
org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
public interface GroupBlackListMapper extends AbstractMapper {
- boolean putGroupBlackListConfig(BdbBlackGroupEntity entity, ProcessResult
result);
+ boolean putGroupBlackListConfig(GroupBlackListEntity memEntity,
ProcessResult result);
boolean delGroupBlackListConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
index 04904ba..e01775c 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
@@ -24,7 +24,7 @@ import
org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
public interface GroupConfigMapper extends AbstractMapper {
- boolean putGroupConfigConfig(GroupConfigEntity entity, ProcessResult
result);
+ boolean putGroupConfigConfig(GroupConfigEntity memEntity, ProcessResult
result);
boolean delGroupConfigConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
index 5a83a74..1925b41 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
@@ -23,7 +23,7 @@ import
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntit
public interface GroupFilterCtrlMapper extends AbstractMapper {
- boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity entity,
ProcessResult result);
+ boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity,
ProcessResult result);
boolean delGroupFilterCtrlConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
index a7d3164..fda4f6c 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
@@ -18,12 +18,12 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
public interface TopicConfigMapper extends AbstractMapper {
- boolean putTopicConfig(BdbTopicConfEntity entity, ProcessResult result);
+ boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult result);
boolean delTopicConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
index a3c4d99..c42ea45 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
@@ -24,7 +24,7 @@ import
org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
public interface TopicCtrlMapper extends AbstractMapper {
- boolean putTopicCtrlConfig(TopicCtrlEntity entity, ProcessResult result);
+ boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult
result);
boolean delTopicCtrlConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
new file mode 100644
index 0000000..8d1c693
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.BrokerConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbBrokerConfigMapperImpl.class);
+
+
+ // broker config store
+ private EntityStore brokerConfStore;
+ private PrimaryIndex<Integer/* brokerId */, BdbBrokerConfEntity>
brokerConfIndex;
+
+
+ public BdbBrokerConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig
storeConfig) {
+ brokerConfStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_BROKER_CONFIG_STORE_NAME, storeConfig);
+ brokerConfIndex =
+ brokerConfStore.getPrimaryIndex(Integer.class,
BdbBrokerConfEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (brokerConfStore != null) {
+ try {
+ brokerConfStore.close();
+ brokerConfStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close broker configure failure ", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<Integer, BrokerConfEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbBrokerConfEntity> cursor = null;
+ logger.info("[BDB Impl] load broker configure start...");
+ try {
+ cursor = brokerConfIndex.entities();
+ for (BdbBrokerConfEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
broker configure!");
+ continue;
+ }
+ BrokerConfEntity memEntity =
+ new BrokerConfEntity(bdbEntity);
+ metaDataMap.put(memEntity.getBrokerId(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total broker configure records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load broker configure failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load broker configure successfully...");
+ }
+
+ /**
+ * Put cluster setting info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult
result) {
+ BdbBrokerConfEntity retData = null;
+ BdbBrokerConfEntity bdbEntity =
+ memEntity.buildBdbBrokerConfEntity();
+ try {
+ retData = brokerConfIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put broker configure failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put broker configure failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delBrokerConfig(int brokerId) {
+ try {
+ brokerConfIndex.delete(brokerId);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete broker configure failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
new file mode 100644
index 0000000..b9fb67e
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
+import
org.apache.tubemq.server.master.metastore.dao.mapper.ClusterConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbClusterConfigMapperImpl.class);
+
+ private EntityStore clsDefSettingStore;
+ private PrimaryIndex<String, BdbClusterSettingEntity> clsDefSettingIndex;
+
+
+ public BdbClusterConfigMapperImpl(ReplicatedEnvironment repEnv,
StoreConfig storeConfig) {
+ clsDefSettingStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
+ clsDefSettingIndex =
+ clsDefSettingStore.getPrimaryIndex(String.class,
BdbClusterSettingEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (clsDefSettingStore != null) {
+ try {
+ clsDefSettingStore.close();
+ clsDefSettingStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close cluster configure failure ", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<String, ClusterSettingEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbClusterSettingEntity> cursor = null;
+ logger.info("[BDB Impl] load cluster configure start...");
+ try {
+ cursor = clsDefSettingIndex.entities();
+ for (BdbClusterSettingEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
cluster configure!");
+ continue;
+ }
+ ClusterSettingEntity memEntity =
+ new ClusterSettingEntity(bdbEntity);
+ metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total cluster configure records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load cluster configure failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load cluster configure successfully...");
+ }
+
+ /**
+ * Put cluster setting info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putClusterConfig(ClusterSettingEntity memEntity,
ProcessResult result) {
+ BdbClusterSettingEntity retData = null;
+ BdbClusterSettingEntity bdbEntity =
+ memEntity.buildBdbClsDefSettingEntity();
+ try {
+ retData = clsDefSettingIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put cluster configure failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put cluster configure failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delClusterConfig(String key) {
+ try {
+ clsDefSettingIndex.delete(key);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete cluster configure failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
new file mode 100644
index 0000000..7dcb9b0
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
+import
org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
+import
org.apache.tubemq.server.master.metastore.dao.mapper.GroupBlackListMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
+
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbGroupBlackListMapperImpl.class);
+
+
+ // consumer group black list store
+ private EntityStore blackGroupStore;
+ private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity>
blackGroupIndex;
+
+
+ public BdbGroupBlackListMapperImpl(ReplicatedEnvironment repEnv,
StoreConfig storeConfig) {
+ blackGroupStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_BLACK_GROUP_STORE_NAME, storeConfig);
+ blackGroupIndex =
+ blackGroupStore.getPrimaryIndex(String.class,
BdbBlackGroupEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (blackGroupStore != null) {
+ try {
+ blackGroupStore.close();
+ blackGroupStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close blacklist configure failure ",
e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<String, GroupBlackListEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbBlackGroupEntity> cursor = null;
+ logger.info("[BDB Impl] load blacklist configure start...");
+ try {
+ cursor = blackGroupIndex.entities();
+ for (BdbBlackGroupEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
blacklist configure!");
+ continue;
+ }
+ GroupBlackListEntity memEntity =
+ new GroupBlackListEntity(bdbEntity);
+ metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total blacklist configure records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load blacklist configure failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load blacklist configure successfully...");
+ }
+
+ /**
+ * Put blacklist configure info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putGroupBlackListConfig(GroupBlackListEntity memEntity,
ProcessResult result) {
+ BdbBlackGroupEntity retData = null;
+ BdbBlackGroupEntity bdbEntity =
+ memEntity.buildBdbBlackListEntity();
+ try {
+ retData = blackGroupIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put blacklist configure failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put blacklist configure failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupBlackListConfig(String recordKey) {
+ try {
+ blackGroupIndex.delete(recordKey);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete blacklist configure failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
new file mode 100644
index 0000000..bb76154
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.GroupConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
+
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbGroupConfigMapperImpl.class);
+
+
+ // consumer group configure store
+ private EntityStore groupConfStore;
+ private PrimaryIndex<String/* groupName */, BdbGroupFlowCtrlEntity>
groupConfIndex;
+
+
+ public BdbGroupConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig
storeConfig) {
+ groupConfStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_GROUP_FLOW_CONTROL_STORE_NAME,
storeConfig);
+ groupConfIndex =
+ groupConfStore.getPrimaryIndex(String.class,
BdbGroupFlowCtrlEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (groupConfStore != null) {
+ try {
+ groupConfStore.close();
+ groupConfStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close group configure failure ", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<String, GroupConfigEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbGroupFlowCtrlEntity> cursor = null;
+ logger.info("[BDB Impl] load group configure start...");
+ try {
+ cursor = groupConfIndex.entities();
+ for (BdbGroupFlowCtrlEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
group configure!");
+ continue;
+ }
+ GroupConfigEntity memEntity =
+ new GroupConfigEntity(bdbEntity);
+ metaDataMap.put(memEntity.getGroupName(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total group configure records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load group configure failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load group configure successfully...");
+ }
+
+ /**
+ * Put Group configure info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putGroupConfigConfig(GroupConfigEntity memEntity,
ProcessResult result) {
+ BdbGroupFlowCtrlEntity retData = null;
+ BdbGroupFlowCtrlEntity bdbEntity =
+ memEntity.buildBdbGroupFlowCtrlEntity();
+ try {
+ retData = groupConfIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put group configure failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put group configure failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupConfigConfig(String recordKey) {
+ try {
+ groupConfIndex.delete(recordKey);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete group configure failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
new file mode 100644
index 0000000..5280053
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
+import
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
+import
org.apache.tubemq.server.master.metastore.dao.mapper.GroupFilterCtrlMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
+
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbGroupFilterCtrlMapperImpl.class);
+
+
+ // consumer group filter control store
+ private EntityStore groupFilterStore;
+ private PrimaryIndex<String/* recordKey */, BdbGroupFilterCondEntity>
groupFilterIndex;
+
+
+ public BdbGroupFilterCtrlMapperImpl(ReplicatedEnvironment repEnv,
StoreConfig storeConfig) {
+ groupFilterStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_GROUP_FILTER_COND_STORE_NAME, storeConfig);
+ groupFilterIndex =
+ groupFilterStore.getPrimaryIndex(String.class,
BdbGroupFilterCondEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (groupFilterStore != null) {
+ try {
+ groupFilterStore.close();
+ groupFilterStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close filter configure failure ", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<String, GroupFilterCtrlEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbGroupFilterCondEntity> cursor = null;
+ logger.info("[BDB Impl] load filter configure start...");
+ try {
+ cursor = groupFilterIndex.entities();
+ for (BdbGroupFilterCondEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
filter configure!");
+ continue;
+ }
+ GroupFilterCtrlEntity memEntity =
+ new GroupFilterCtrlEntity(bdbEntity);
+ metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total filter configure records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load filter configure failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load filter configure successfully...");
+ }
+
+ /**
+ * Put Group filter configure info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity,
ProcessResult result) {
+ BdbGroupFilterCondEntity retData = null;
+ BdbGroupFilterCondEntity bdbEntity =
+ memEntity.buildBdbGroupFilterCondEntity();
+ try {
+ retData = groupFilterIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put filter configure failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put filter configure failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupFilterCtrlConfig(String recordKey) {
+ try {
+ groupFilterIndex.delete(recordKey);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete filter configure failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
new file mode 100644
index 0000000..334c423
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.TopicConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
+
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbTopicConfigMapperImpl.class);
+
+
+ // Topic configure store
+ private EntityStore topicConfStore;
+ private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity>
topicConfIndex;
+
+
+ public BdbTopicConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig
storeConfig) {
+ topicConfStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
+ topicConfIndex =
+ topicConfStore.getPrimaryIndex(String.class,
BdbTopicConfEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (topicConfStore != null) {
+ try {
+ topicConfStore.close();
+ topicConfStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close topic configure failure ", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<String, TopicConfEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbTopicConfEntity> cursor = null;
+ logger.info("[BDB Impl] load topic configure start...");
+ try {
+ cursor = topicConfIndex.entities();
+ for (BdbTopicConfEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
topic configure!");
+ continue;
+ }
+ TopicConfEntity memEntity =
+ new TopicConfEntity(bdbEntity);
+ metaDataMap.put(memEntity.getRecordKey(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total topic configure records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load topic configure failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load topic configure successfully...");
+ }
+
+ /**
+ * Put topic configure info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult
result) {
+ BdbTopicConfEntity retData = null;
+ BdbTopicConfEntity bdbEntity =
+ memEntity.buildBdbTopicConfEntity();
+ try {
+ retData = topicConfIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put topic configure failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put topic configure failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delTopicConfig(String recordKey) {
+ try {
+ topicConfIndex.delete(recordKey);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete topic configure failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
new file mode 100644
index 0000000..9050938
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.TopicCtrlMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
+
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(BdbTopicCtrlMapperImpl.class);
+
+
+ // Topic control store
+ private EntityStore topicCtrlStore;
+ private PrimaryIndex<String/* recordKey */, BdbTopicAuthControlEntity>
topicCtrlIndex;
+
+
+ public BdbTopicCtrlMapperImpl(ReplicatedEnvironment repEnv, StoreConfig
storeConfig) {
+ topicCtrlStore = new EntityStore(repEnv,
+ TBDBStoreTables.BDB_TOPIC_AUTH_CONTROL_STORE_NAME,
storeConfig);
+ topicCtrlIndex =
+ topicCtrlStore.getPrimaryIndex(String.class,
BdbTopicAuthControlEntity.class);
+ }
+
+ @Override
+ public void close() {
+ if (topicCtrlStore != null) {
+ try {
+ topicCtrlStore.close();
+ topicCtrlStore = null;
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] close topic control failure ", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadConfig(ProcessResult result) throws LoadMetaException {
+ long count = 0L;
+ Map<String, TopicCtrlEntity> metaDataMap = new HashMap<>();
+ EntityCursor<BdbTopicAuthControlEntity> cursor = null;
+ logger.info("[BDB Impl] load topic configure start...");
+ try {
+ cursor = topicCtrlIndex.entities();
+ for (BdbTopicAuthControlEntity bdbEntity : cursor) {
+ if (bdbEntity == null) {
+ logger.warn("[BDB Impl] found Null data while loading
topic control!");
+ continue;
+ }
+ TopicCtrlEntity memEntity =
+ new TopicCtrlEntity(bdbEntity);
+ metaDataMap.put(memEntity.getTopicName(), memEntity);
+ count++;
+ }
+ logger.info("[BDB Impl] total topic control records are {}",
count);
+ result.setSuccResult(metaDataMap);
+ } catch (Exception e) {
+ logger.error("[BDB Impl] load topic control failure ", e);
+ throw new LoadMetaException(e.getMessage());
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+ logger.info("[BDB Impl] load topic control successfully...");
+ }
+
+ /**
+ * Put topic control configure info into bdb store
+ *
+ * @param memEntity need add record
+ * @param result process result with old value
+ * @return
+ */
+ @Override
+ public boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult
result) {
+ BdbTopicAuthControlEntity retData = null;
+ BdbTopicAuthControlEntity bdbEntity =
+ memEntity.buildBdbTopicAuthControlEntity();
+ try {
+ retData = topicCtrlIndex.put(bdbEntity);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] put topic control failure ", e);
+ result.setFailResult(new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append("Put topic control failure: ")
+ .append(e.getMessage()).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(retData == null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delTopicCtrlConfig(String recordKey) {
+ try {
+ topicCtrlIndex.delete(recordKey);
+ } catch (Throwable e) {
+ logger.error("[BDB Impl] delete topic control failure ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/TBDBStoreTables.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/TBDBStoreTables.java
new file mode 100644
index 0000000..eb4b482
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/TBDBStoreTables.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+public final class TBDBStoreTables {
+
+
+ public static final String BDB_CLUSTER_SETTING_STORE_NAME =
"bdbClusterSetting";
+ public static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
+ public static final String BDB_BROKER_CONFIG_STORE_NAME =
"bdbBrokerConfig";
+ public static final String BDB_CONSUMER_GROUP_STORE_NAME =
"bdbConsumerGroup";
+ public static final String BDB_TOPIC_AUTH_CONTROL_STORE_NAME =
"bdbTopicAuthControl";
+ public static final String BDB_BLACK_GROUP_STORE_NAME = "bdbBlackGroup";
+ public static final String BDB_GROUP_FILTER_COND_STORE_NAME =
"bdbGroupFilterCond";
+ public static final String BDB_GROUP_FLOW_CONTROL_STORE_NAME =
"bdbGroupFlowCtrlCfg";
+ public static final String BDB_CONSUME_GROUP_SETTING_STORE_NAME =
"bdbConsumeGroupSetting";
+}