This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
     new 06539e8  [TUBEMQ-578] Build implementation classes based on BDB storage
06539e8 is described below

commit 06539e8582fdea782039943b087804f87409e12a
Author: gosonzhang <[email protected]>
AuthorDate: Fri Mar 19 10:18:01 2021 +0800

    [TUBEMQ-578] Build implementation classes based on BDB storage
---
 .../master/bdbstore/DefaultBdbStoreService.java    |  28 ++---
 .../dao/entity/GroupFilterCtrlEntity.java          |   2 +-
 .../metastore/dao/mapper/ClusterConfigMapper.java  |   2 +-
 .../metastore/dao/mapper/GroupBlackListMapper.java |   5 +-
 .../metastore/dao/mapper/GroupConfigMapper.java    |   2 +-
 .../dao/mapper/GroupFilterCtrlMapper.java          |   2 +-
 .../metastore/dao/mapper/TopicConfigMapper.java    |   4 +-
 .../metastore/dao/mapper/TopicCtrlMapper.java      |   2 +-
 .../impl/bdbimpl/BdbBrokerConfigMapperImpl.java    | 136 ++++++++++++++++++++
 .../impl/bdbimpl/BdbClusterConfigMapperImpl.java   | 132 ++++++++++++++++++++
 .../impl/bdbimpl/BdbGroupBlackListMapperImpl.java  | 137 +++++++++++++++++++++
 .../impl/bdbimpl/BdbGroupConfigMapperImpl.java     | 137 +++++++++++++++++++++
 .../impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java | 137 +++++++++++++++++++++
 .../impl/bdbimpl/BdbTopicConfigMapperImpl.java     | 137 +++++++++++++++++++++
 .../impl/bdbimpl/BdbTopicCtrlMapperImpl.java       | 137 +++++++++++++++++++++
 .../metastore/impl/bdbimpl/TBDBStoreTables.java    |  33 +++++
 16 files changed, 1005 insertions(+), 28 deletions(-)

diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index b4a86cd..e14ff2b 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -68,6 +68,7 @@ import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntit
 import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
 import org.apache.tubemq.server.master.metastore.TStoreConstants;
+import org.apache.tubemq.server.master.metastore.impl.bdbimpl.TBDBStoreTables;
 import org.apache.tubemq.server.master.utils.BdbStoreSamplePrint;
 import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
 import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -83,15 +84,6 @@ import org.slf4j.LoggerFactory;
 public class DefaultBdbStoreService implements BdbStoreService, Server {
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultBdbStoreService.class);
 
-    private static final String BDB_CLUSTER_SETTING_STORE_NAME = 
"bdbClusterSetting";
-    private static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
-    private static final String BDB_BROKER_CONFIG_STORE_NAME = 
"bdbBrokerConfig";
-    private static final String BDB_CONSUMER_GROUP_STORE_NAME = 
"bdbConsumerGroup";
-    private static final String BDB_TOPIC_AUTH_CONTROL_STORE_NAME = 
"bdbTopicAuthControl";
-    private static final String BDB_BLACK_GROUP_STORE_NAME = "bdbBlackGroup";
-    private static final String BDB_GROUP_FILTER_COND_STORE_NAME = 
"bdbGroupFilterCond";
-    private static final String BDB_GROUP_FLOW_CONTROL_STORE_NAME = 
"bdbGroupFlowCtrlCfg";
-    private static final String BDB_CONSUME_GROUP_SETTING_STORE_NAME = 
"bdbConsumeGroupSetting";
     private static final int REP_HANDLE_RETRY_MAX = 1;
     private final TMaster tMaster;
     // simple log print
@@ -1001,39 +993,39 @@ public class DefaultBdbStoreService implements 
BdbStoreService, Server {
     /* initial metadata */
     private void initMetaStore() {
         brokerConfStore =
-                new EntityStore(repEnv, BDB_BROKER_CONFIG_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_BROKER_CONFIG_STORE_NAME, storeConfig);
         brokerConfIndex =
                 brokerConfStore.getPrimaryIndex(Integer.class, 
BdbBrokerConfEntity.class);
         topicConfStore =
-                new EntityStore(repEnv, BDB_TOPIC_CONFIG_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
         topicConfIndex =
                 topicConfStore.getPrimaryIndex(String.class, 
BdbTopicConfEntity.class);
         consumerGroupStore =
-                new EntityStore(repEnv, BDB_CONSUMER_GROUP_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_CONSUMER_GROUP_STORE_NAME, storeConfig);
         consumerGroupIndex =
                 consumerGroupStore.getPrimaryIndex(String.class, 
BdbConsumerGroupEntity.class);
         topicAuthControlStore =
-                new EntityStore(repEnv, BDB_TOPIC_AUTH_CONTROL_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_TOPIC_AUTH_CONTROL_STORE_NAME, storeConfig);
         topicAuthControlIndex =
                 topicAuthControlStore.getPrimaryIndex(String.class, 
BdbTopicAuthControlEntity.class);
         blackGroupStore =
-                new EntityStore(repEnv, BDB_BLACK_GROUP_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_BLACK_GROUP_STORE_NAME, storeConfig);
         blackGroupIndex =
                 blackGroupStore.getPrimaryIndex(String.class, 
BdbBlackGroupEntity.class);
         groupFilterCondStore =
-                new EntityStore(repEnv, BDB_GROUP_FILTER_COND_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_GROUP_FILTER_COND_STORE_NAME, storeConfig);
         groupFilterCondIndex =
                 groupFilterCondStore.getPrimaryIndex(String.class, 
BdbGroupFilterCondEntity.class);
         groupFlowCtrlStore =
-                new EntityStore(repEnv, BDB_GROUP_FLOW_CONTROL_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_GROUP_FLOW_CONTROL_STORE_NAME, storeConfig);
         groupFlowCtrlIndex =
                 groupFlowCtrlStore.getPrimaryIndex(String.class, 
BdbGroupFlowCtrlEntity.class);
         consumeGroupSettingStore =
-                new EntityStore(repEnv, BDB_CONSUME_GROUP_SETTING_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_CONSUME_GROUP_SETTING_STORE_NAME, storeConfig);
         consumeGroupSettingIndex =
                 consumeGroupSettingStore.getPrimaryIndex(String.class, 
BdbConsumeGroupSettingEntity.class);
         clusterDefSettingStore =
-                new EntityStore(repEnv, BDB_CLUSTER_SETTING_STORE_NAME, 
storeConfig);
+                new EntityStore(repEnv, 
TBDBStoreTables.BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
         clusterDefSettingIndex =
                 clusterDefSettingStore.getPrimaryIndex(String.class, 
BdbClusterSettingEntity.class);
     }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
index 59966bd..f81c7c4 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
@@ -64,7 +64,7 @@ public class GroupFilterCtrlEntity extends BaseEntity {
             this.filterConsumeStatus = EnableStatus.STATUS_DISABLE;
         }
         this.filterCondStr = bdbEntity.getFilterCondStr();
-        this.setAttributes(bdbEntity.getFilterCondStr());
+        this.setAttributes(bdbEntity.getAttributes());
     }
 
     public BdbGroupFilterCondEntity buildBdbGroupFilterCondEntity() {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
index 3fcd45d..6fcb5e4 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
@@ -23,7 +23,7 @@ import 
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity
 
 public interface ClusterConfigMapper extends AbstractMapper {
 
-    boolean putClusterConfig(ClusterSettingEntity entity, ProcessResult 
result);
+    boolean putClusterConfig(ClusterSettingEntity memEntity, ProcessResult 
result);
 
     boolean delClusterConfig(String key);
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
index fed52e6..e9f49c2 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
@@ -18,13 +18,12 @@
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
 import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
-
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
 
 
 public interface GroupBlackListMapper extends AbstractMapper {
 
-    boolean putGroupBlackListConfig(BdbBlackGroupEntity entity, ProcessResult 
result);
+    boolean putGroupBlackListConfig(GroupBlackListEntity memEntity, 
ProcessResult result);
 
     boolean delGroupBlackListConfig(String key);
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
index 04904ba..e01775c 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
@@ -24,7 +24,7 @@ import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
 
 public interface GroupConfigMapper extends AbstractMapper {
 
-    boolean putGroupConfigConfig(GroupConfigEntity entity, ProcessResult 
result);
+    boolean putGroupConfigConfig(GroupConfigEntity memEntity, ProcessResult 
result);
 
     boolean delGroupConfigConfig(String key);
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
index 5a83a74..1925b41 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
@@ -23,7 +23,7 @@ import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntit
 
 public interface GroupFilterCtrlMapper extends AbstractMapper {
 
-    boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity entity, 
ProcessResult result);
+    boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity, 
ProcessResult result);
 
     boolean delGroupFilterCtrlConfig(String key);
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
index a7d3164..fda4f6c 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
@@ -18,12 +18,12 @@
 package org.apache.tubemq.server.master.metastore.dao.mapper;
 
 import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
 
 
 public interface TopicConfigMapper extends AbstractMapper {
 
-    boolean putTopicConfig(BdbTopicConfEntity entity, ProcessResult result);
+    boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult result);
 
     boolean delTopicConfig(String key);
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
index a3c4d99..c42ea45 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
@@ -24,7 +24,7 @@ import 
org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
 
 public interface TopicCtrlMapper extends AbstractMapper {
 
-    boolean putTopicCtrlConfig(TopicCtrlEntity entity, ProcessResult result);
+    boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult 
result);
 
     boolean delTopicCtrlConfig(String key);
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
new file mode 100644
index 0000000..8d1c693
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.BrokerConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbBrokerConfigMapperImpl.class);
+
+
+    // broker config store
+    private EntityStore brokerConfStore;
+    private PrimaryIndex<Integer/* brokerId */, BdbBrokerConfEntity> 
brokerConfIndex;
+
+
+    public BdbBrokerConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig 
storeConfig) {
+        brokerConfStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_BROKER_CONFIG_STORE_NAME, storeConfig);
+        brokerConfIndex =
+                brokerConfStore.getPrimaryIndex(Integer.class, 
BdbBrokerConfEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (brokerConfStore != null) {
+            try {
+                brokerConfStore.close();
+                brokerConfStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close broker configure failure ", e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<Integer, BrokerConfEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbBrokerConfEntity> cursor = null;
+        logger.info("[BDB Impl] load broker configure start...");
+        try {
+            cursor = brokerConfIndex.entities();
+            for (BdbBrokerConfEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
broker configure!");
+                    continue;
+                }
+                BrokerConfEntity memEntity =
+                        new BrokerConfEntity(bdbEntity);
+                metaDataMap.put(memEntity.getBrokerId(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total broker configure records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load broker configure failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load broker configure successfully...");
+    }
+
+    /**
+     * Put cluster setting info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult 
result) {
+        BdbBrokerConfEntity retData = null;
+        BdbBrokerConfEntity bdbEntity =
+                memEntity.buildBdbBrokerConfEntity();
+        try {
+            retData = brokerConfIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put broker configure failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put broker configure failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delBrokerConfig(int brokerId) {
+        try {
+            brokerConfIndex.delete(brokerId);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete broker configure failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
new file mode 100644
index 0000000..b9fb67e
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.mapper.ClusterConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbClusterConfigMapperImpl.class);
+
+    private EntityStore clsDefSettingStore;
+    private PrimaryIndex<String, BdbClusterSettingEntity> clsDefSettingIndex;
+
+
+    public BdbClusterConfigMapperImpl(ReplicatedEnvironment repEnv, 
StoreConfig storeConfig) {
+        clsDefSettingStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
+        clsDefSettingIndex =
+                clsDefSettingStore.getPrimaryIndex(String.class, 
BdbClusterSettingEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (clsDefSettingStore != null) {
+            try {
+                clsDefSettingStore.close();
+                clsDefSettingStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close cluster configure failure ", e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<String, ClusterSettingEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbClusterSettingEntity> cursor = null;
+        logger.info("[BDB Impl] load cluster configure start...");
+        try {
+            cursor = clsDefSettingIndex.entities();
+            for (BdbClusterSettingEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
cluster configure!");
+                    continue;
+                }
+                ClusterSettingEntity memEntity =
+                        new ClusterSettingEntity(bdbEntity);
+                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total cluster configure records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load cluster configure failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load cluster configure successfully...");
+    }
+
+    /**
+     * Put cluster setting info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putClusterConfig(ClusterSettingEntity memEntity, 
ProcessResult result) {
+        BdbClusterSettingEntity retData = null;
+        BdbClusterSettingEntity bdbEntity =
+                memEntity.buildBdbClsDefSettingEntity();
+        try {
+            retData = clsDefSettingIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put cluster configure failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put cluster configure failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delClusterConfig(String key) {
+        try {
+            clsDefSettingIndex.delete(key);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete cluster configure failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
new file mode 100644
index 0000000..7dcb9b0
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.mapper.GroupBlackListMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
+
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbGroupBlackListMapperImpl.class);
+
+
+    // consumer group black list store
+    private EntityStore blackGroupStore;
+    private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity> 
blackGroupIndex;
+
+
+    public BdbGroupBlackListMapperImpl(ReplicatedEnvironment repEnv, 
StoreConfig storeConfig) {
+        blackGroupStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_BLACK_GROUP_STORE_NAME, storeConfig);
+        blackGroupIndex =
+                blackGroupStore.getPrimaryIndex(String.class, 
BdbBlackGroupEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (blackGroupStore != null) {
+            try {
+                blackGroupStore.close();
+                blackGroupStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close blacklist configure failure ", 
e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<String, GroupBlackListEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbBlackGroupEntity> cursor = null;
+        logger.info("[BDB Impl] load blacklist configure start...");
+        try {
+            cursor = blackGroupIndex.entities();
+            for (BdbBlackGroupEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
blacklist configure!");
+                    continue;
+                }
+                GroupBlackListEntity memEntity =
+                        new GroupBlackListEntity(bdbEntity);
+                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total blacklist configure records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load blacklist configure failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load blacklist configure successfully...");
+    }
+
+    /**
+     * Put blacklist configure info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putGroupBlackListConfig(GroupBlackListEntity memEntity, 
ProcessResult result) {
+        BdbBlackGroupEntity retData = null;
+        BdbBlackGroupEntity bdbEntity =
+                memEntity.buildBdbBlackListEntity();
+        try {
+            retData = blackGroupIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put blacklist configure failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put blacklist configure failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupBlackListConfig(String recordKey) {
+        try {
+            blackGroupIndex.delete(recordKey);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete blacklist configure failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
new file mode 100644
index 0000000..bb76154
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupConfigMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.GroupConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbGroupConfigMapperImpl implements GroupConfigMapper {
+
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbGroupConfigMapperImpl.class);
+
+
+    // consumer group configure store
+    private EntityStore groupConfStore;
+    private PrimaryIndex<String/* groupName */, BdbGroupFlowCtrlEntity> 
groupConfIndex;
+
+
+    public BdbGroupConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig 
storeConfig) {
+        groupConfStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_GROUP_FLOW_CONTROL_STORE_NAME, 
storeConfig);
+        groupConfIndex =
+                groupConfStore.getPrimaryIndex(String.class, 
BdbGroupFlowCtrlEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (groupConfStore != null) {
+            try {
+                groupConfStore.close();
+                groupConfStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close group configure failure ", e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<String, GroupConfigEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbGroupFlowCtrlEntity> cursor = null;
+        logger.info("[BDB Impl] load group configure start...");
+        try {
+            cursor = groupConfIndex.entities();
+            for (BdbGroupFlowCtrlEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
group configure!");
+                    continue;
+                }
+                GroupConfigEntity memEntity =
+                        new GroupConfigEntity(bdbEntity);
+                metaDataMap.put(memEntity.getGroupName(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total group configure records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load group configure failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load group configure successfully...");
+    }
+
+    /**
+     * Put Group configure info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putGroupConfigConfig(GroupConfigEntity memEntity, 
ProcessResult result) {
+        BdbGroupFlowCtrlEntity retData = null;
+        BdbGroupFlowCtrlEntity bdbEntity =
+                memEntity.buildBdbGroupFlowCtrlEntity();
+        try {
+            retData = groupConfIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put group configure failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put group configure failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupConfigConfig(String recordKey) {
+        try {
+            groupConfIndex.delete(recordKey);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete group configure failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
new file mode 100644
index 0000000..5280053
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbGroupFilterCtrlMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.mapper.GroupFilterCtrlMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbGroupFilterCtrlMapperImpl implements GroupFilterCtrlMapper {
+
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbGroupFilterCtrlMapperImpl.class);
+
+
+    // consumer group filter control store
+    private EntityStore groupFilterStore;
+    private PrimaryIndex<String/* recordKey */, BdbGroupFilterCondEntity> 
groupFilterIndex;
+
+
+    public BdbGroupFilterCtrlMapperImpl(ReplicatedEnvironment repEnv, 
StoreConfig storeConfig) {
+        groupFilterStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_GROUP_FILTER_COND_STORE_NAME, storeConfig);
+        groupFilterIndex =
+                groupFilterStore.getPrimaryIndex(String.class, 
BdbGroupFilterCondEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (groupFilterStore != null) {
+            try {
+                groupFilterStore.close();
+                groupFilterStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close filter configure failure ", e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<String, GroupFilterCtrlEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbGroupFilterCondEntity> cursor = null;
+        logger.info("[BDB Impl] load filter configure start...");
+        try {
+            cursor = groupFilterIndex.entities();
+            for (BdbGroupFilterCondEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
filter configure!");
+                    continue;
+                }
+                GroupFilterCtrlEntity memEntity =
+                        new GroupFilterCtrlEntity(bdbEntity);
+                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total filter configure records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load filter configure failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load filter configure successfully...");
+    }
+
+    /**
+     * Put Group filter configure info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity memEntity, 
ProcessResult result) {
+        BdbGroupFilterCondEntity retData = null;
+        BdbGroupFilterCondEntity bdbEntity =
+                memEntity.buildBdbGroupFilterCondEntity();
+        try {
+            retData = groupFilterIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put filter configure failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put filter configure failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delGroupFilterCtrlConfig(String recordKey) {
+        try {
+            groupFilterIndex.delete(recordKey);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete filter configure failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
new file mode 100644
index 0000000..334c423
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicConfigMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.TopicConfigMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbTopicConfigMapperImpl implements TopicConfigMapper {
+
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbTopicConfigMapperImpl.class);
+
+
+    // Topic configure store
+    private EntityStore topicConfStore;
+    private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> 
topicConfIndex;
+
+
+    public BdbTopicConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig 
storeConfig) {
+        topicConfStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
+        topicConfIndex =
+                topicConfStore.getPrimaryIndex(String.class, 
BdbTopicConfEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (topicConfStore != null) {
+            try {
+                topicConfStore.close();
+                topicConfStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close topic configure failure ", e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<String, TopicConfEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbTopicConfEntity> cursor = null;
+        logger.info("[BDB Impl] load topic configure start...");
+        try {
+            cursor = topicConfIndex.entities();
+            for (BdbTopicConfEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
topic configure!");
+                    continue;
+                }
+                TopicConfEntity memEntity =
+                        new TopicConfEntity(bdbEntity);
+                metaDataMap.put(memEntity.getRecordKey(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total topic configure records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load topic configure failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load topic configure successfully...");
+    }
+
+    /**
+     * Put topic configure info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putTopicConfig(TopicConfEntity memEntity, ProcessResult 
result) {
+        BdbTopicConfEntity retData = null;
+        BdbTopicConfEntity bdbEntity =
+                memEntity.buildBdbTopicConfEntity();
+        try {
+            retData = topicConfIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put topic configure failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put topic configure failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delTopicConfig(String recordKey) {
+        try {
+            topicConfIndex.delete(recordKey);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete topic configure failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
new file mode 100644
index 0000000..9050938
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.TopicCtrlMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
+
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbTopicCtrlMapperImpl.class);
+
+
+    // Topic control store
+    private EntityStore topicCtrlStore;
+    private PrimaryIndex<String/* recordKey */, BdbTopicAuthControlEntity> 
topicCtrlIndex;
+
+
+    public BdbTopicCtrlMapperImpl(ReplicatedEnvironment repEnv, StoreConfig 
storeConfig) {
+        topicCtrlStore = new EntityStore(repEnv,
+                TBDBStoreTables.BDB_TOPIC_AUTH_CONTROL_STORE_NAME, 
storeConfig);
+        topicCtrlIndex =
+                topicCtrlStore.getPrimaryIndex(String.class, 
BdbTopicAuthControlEntity.class);
+    }
+
+    @Override
+    public void close() {
+        if (topicCtrlStore != null) {
+            try {
+                topicCtrlStore.close();
+                topicCtrlStore = null;
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] close topic control failure ", e);
+            }
+        }
+    }
+
+    @Override
+    public void loadConfig(ProcessResult result) throws LoadMetaException {
+        long count = 0L;
+        Map<String, TopicCtrlEntity> metaDataMap = new HashMap<>();
+        EntityCursor<BdbTopicAuthControlEntity> cursor = null;
+        logger.info("[BDB Impl] load topic configure start...");
+        try {
+            cursor = topicCtrlIndex.entities();
+            for (BdbTopicAuthControlEntity bdbEntity : cursor) {
+                if (bdbEntity == null) {
+                    logger.warn("[BDB Impl] found Null data while loading 
topic control!");
+                    continue;
+                }
+                TopicCtrlEntity memEntity =
+                        new TopicCtrlEntity(bdbEntity);
+                metaDataMap.put(memEntity.getTopicName(), memEntity);
+                count++;
+            }
+            logger.info("[BDB Impl] total topic control records are {}", 
count);
+            result.setSuccResult(metaDataMap);
+        } catch (Exception e) {
+            logger.error("[BDB Impl] load topic control failure ", e);
+            throw new LoadMetaException(e.getMessage());
+        } finally {
+            if (cursor != null) {
+                cursor.close();
+            }
+        }
+        logger.info("[BDB Impl] load topic control successfully...");
+    }
+
+    /**
+     * Put topic control configure info into bdb store
+     *
+     * @param memEntity need add record
+     * @param result process result with old value
+     * @return
+     */
+    @Override
+    public boolean putTopicCtrlConfig(TopicCtrlEntity memEntity, ProcessResult 
result) {
+        BdbTopicAuthControlEntity retData = null;
+        BdbTopicAuthControlEntity bdbEntity =
+                memEntity.buildBdbTopicAuthControlEntity();
+        try {
+            retData = topicCtrlIndex.put(bdbEntity);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] put topic control failure ", e);
+            result.setFailResult(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("Put topic control failure: ")
+                    .append(e.getMessage()).toString());
+            return result.isSuccess();
+        }
+        result.setSuccResult(retData == null);
+        return result.isSuccess();
+    }
+
+    @Override
+    public boolean delTopicCtrlConfig(String recordKey) {
+        try {
+            topicCtrlIndex.delete(recordKey);
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] delete topic control failure ", e);
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/TBDBStoreTables.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/TBDBStoreTables.java
new file mode 100644
index 0000000..eb4b482
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/TBDBStoreTables.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+
+public final class TBDBStoreTables {
+
+
+    public static final String BDB_CLUSTER_SETTING_STORE_NAME = 
"bdbClusterSetting";
+    public static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
+    public static final String BDB_BROKER_CONFIG_STORE_NAME = 
"bdbBrokerConfig";
+    public static final String BDB_CONSUMER_GROUP_STORE_NAME = 
"bdbConsumerGroup";
+    public static final String BDB_TOPIC_AUTH_CONTROL_STORE_NAME = 
"bdbTopicAuthControl";
+    public static final String BDB_BLACK_GROUP_STORE_NAME = "bdbBlackGroup";
+    public static final String BDB_GROUP_FILTER_COND_STORE_NAME = 
"bdbGroupFilterCond";
+    public static final String BDB_GROUP_FLOW_CONTROL_STORE_NAME = 
"bdbGroupFlowCtrlCfg";
+    public static final String BDB_CONSUME_GROUP_SETTING_STORE_NAME = 
"bdbConsumeGroupSetting";
+}

Reply via email to