This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
     new 36c9cdc  [TUBEMQ-580] Build active and standby keep-alive services
36c9cdc is described below

commit 36c9cdcd278bcc1a8b7ffe3b16f9524a323e814b
Author: gosonzhang <[email protected]>
AuthorDate: Thu Mar 25 09:59:10 2021 +0800

    [TUBEMQ-580] Build active and standby keep-alive services
---
 .../server/master/metastore/DataOpErrCode.java     |   4 +-
 .../server/master/metastore/MetaStoreService.java  | 113 +++
 .../impl/bdbimpl/BdbMetaStoreServiceImpl.java      | 983 +++++++++++++++++++++
 3 files changed, 1098 insertions(+), 2 deletions(-)

diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
index 7f1d6c3..8334167 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/DataOpErrCode.java
@@ -24,8 +24,8 @@ public enum DataOpErrCode {
     DERR_EXISTED(402, "Record has existed."),
     DERR_UNCHANGED(403, "Record not changed."),
     DERR_STORE_ABNORMAL(501, "Store layer throw exception."),
-
-    STATUS_DISABLE(0, "Disable.");
+    DERR_STORE_STOPPED(510, "Store stopped."),
+    DERR_STORE_NOT_MASTER(511, "Store not active master.");
 
     private int code;
     private String description;
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/MetaStoreService.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/MetaStoreService.java
new file mode 100644
index 0000000..60ec666
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/MetaStoreService.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.tubemq.server.Server;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.tubemq.server.master.metastore.keepalive.KeepAlive;
+
+
+
+public interface MetaStoreService extends KeepAlive, Server {
+
+    // cluster default configure api
+    boolean addClusterConfig(ClusterSettingEntity memEntity, ProcessResult 
result);
+
+    boolean updClusterConfig(ClusterSettingEntity memEntity, ProcessResult 
result);
+
+    ClusterSettingEntity getClusterConfig();
+
+    boolean delClusterConfig(ProcessResult result);
+
+    // broker configure api
+    boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
+
+    boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
+
+    boolean delBrokerConf(int brokerId, ProcessResult result);
+
+    Map<Integer, BrokerConfEntity> getBrokerConfByBrokerId(BrokerConfEntity 
qryEntity);
+
+    BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
+
+    // topic configure api
+    boolean addTopicConf(TopicConfEntity entity, ProcessResult result);
+
+    boolean updTopicConf(TopicConfEntity entity, ProcessResult result);
+
+    boolean delTopicConf(String recordKey, ProcessResult result);
+
+    List<TopicConfEntity> getTopicConf(TopicConfEntity qryEntity);
+
+    // topic control api
+    boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+
+    boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+
+    boolean delTopicCtrlConf(String recordKey, ProcessResult result);
+
+    TopicCtrlEntity getTopicCtrlConf(String topicName);
+
+    List<TopicCtrlEntity> getTopicCtrlConf(TopicConfEntity qryEntity);
+
+    // group configure api
+    boolean addGroupConf(GroupConfigEntity entity, ProcessResult result);
+
+    boolean updGroupConf(GroupConfigEntity entity, ProcessResult result);
+
+    boolean delGroupConf(String groupName, ProcessResult result);
+
+    GroupConfigEntity getGroupConf(String groupName);
+
+    Map<String, GroupConfigEntity> getGroupConf(GroupConfigEntity qryEntity);
+
+    // group blacklist api
+    boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult 
result);
+
+    boolean updGroupBlackListConf(GroupBlackListEntity entity, ProcessResult 
result);
+
+    boolean delGroupBlackListConf(String recordKey, ProcessResult result);
+
+    boolean delGroupBlackListConfByGroupName(String groupName, ProcessResult 
result);
+
+    List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName);
+
+    List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName);
+
+    List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity 
qryEntity);
+
+    // group filter control api
+    boolean addGroupFilterCtrlConf(GroupFilterCtrlEntity entity, ProcessResult 
result);
+
+    boolean updGroupFilterCtrlConf(GroupFilterCtrlEntity entity, ProcessResult 
result);
+
+    boolean delGroupFilterCtrlConf(String recordKey, ProcessResult result);
+
+    List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(String groupName);
+
+    List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(GroupFilterCtrlEntity 
qryEntity);
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
new file mode 100644
index 0000000..37784f9
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
@@ -0,0 +1,983 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.impl.bdbimpl;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationGroup;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.TimeConsistencyPolicy;
+import com.sleepycat.je.rep.UnknownMasterException;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher;
+import com.sleepycat.persist.StoreConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
+import org.apache.tubemq.server.master.bdbstore.MasterNodeInfo;
+import org.apache.tubemq.server.master.metastore.DataOpErrCode;
+import org.apache.tubemq.server.master.metastore.MetaStoreService;
+import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupBlackListEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
+import 
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicConfEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.tubemq.server.master.metastore.dao.mapper.BrokerConfigMapper;
+import 
org.apache.tubemq.server.master.metastore.dao.mapper.ClusterConfigMapper;
+import 
org.apache.tubemq.server.master.metastore.dao.mapper.GroupBlackListMapper;
+import org.apache.tubemq.server.master.metastore.dao.mapper.GroupConfigMapper;
+import 
org.apache.tubemq.server.master.metastore.dao.mapper.GroupFilterCtrlMapper;
+import org.apache.tubemq.server.master.metastore.dao.mapper.TopicConfigMapper;
+import org.apache.tubemq.server.master.metastore.dao.mapper.TopicCtrlMapper;
+import org.apache.tubemq.server.master.metastore.keepalive.AliveObserver;
+import org.apache.tubemq.server.master.utils.BdbStoreSamplePrint;
+import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
+import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BdbMetaStoreServiceImpl implements MetaStoreService {
+
+    private static final int REP_HANDLE_RETRY_MAX = 1;
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(BdbMetaStoreServiceImpl.class);
+    private final BdbStoreSamplePrint bdbStoreSamplePrint =
+            new BdbStoreSamplePrint(logger);
+    // parameters need input
+    // local host name
+    private final String nodeHost;
+    // meta data store path
+    private final String metaDataPath;
+    // bdb replication configure
+    private final MasterReplicationConfig replicationConfig;
+    private Listener listener = new Listener();
+    private ExecutorService executorService = null;
+    private List<AliveObserver> eventObservers = new ArrayList<>();
+    // service status
+    // 0 stopped, 1 starting, 2 started, 3 stopping
+    private AtomicInteger srvStatus = new AtomicInteger(0);
+    // master role flag
+    private volatile boolean isMaster = false;
+    // time since node become active
+    private AtomicLong masterSinceTime = new AtomicLong(Long.MAX_VALUE);
+    // master node name
+    private String masterNodeName;
+    // node connect failure count
+    private int connectNodeFailCount = 0;
+    // replication nodes
+    private Set<String> replicas4Transfer = new HashSet<>();
+
+    // meta data store file
+    private File envHome;
+    // bdb replication configure
+    private ReplicationConfig repConfig;
+    // bdb environment configure
+    private EnvironmentConfig envConfig;
+    // bdb replicated environment
+    private ReplicatedEnvironment repEnv;
+    // bdb data store configure
+    private StoreConfig storeConfig = new StoreConfig();
+    // bdb replication group admin info
+    private ReplicationGroupAdmin replicationGroupAdmin;
+
+    // cluster default setting
+    private ClusterConfigMapper clusterConfigMapper;
+    // broker configure
+    private BrokerConfigMapper brokerConfigMapper;
+    // topic configure
+    private TopicConfigMapper topicConfigMapper;
+    // topic control configure
+    private TopicCtrlMapper topicCtrlMapper;
+    // group configure
+    private GroupConfigMapper groupConfigMapper;
+    // group filter configure
+    private GroupFilterCtrlMapper groupFilterCtrlMapper;
+    // group blackList configure
+    private GroupBlackListMapper groupBlackListMapper;
+
+
+
+
+    public BdbMetaStoreServiceImpl(String nodeHost, String metaDataPath,
+                                   MasterReplicationConfig replicationConfig) {
+        this.nodeHost = nodeHost;
+        this.metaDataPath = metaDataPath;
+        this.replicationConfig = replicationConfig;
+        // build replicationGroupAdmin info
+        Set<InetSocketAddress> helpers = new HashSet<>();
+        for (int i = 1; i <= 3; i++) {
+            InetSocketAddress helper = new InetSocketAddress(
+                    this.nodeHost, replicationConfig.getRepNodePort() + i);
+            helpers.add(helper);
+        }
+        this.replicationGroupAdmin =
+                new 
ReplicationGroupAdmin(this.replicationConfig.getRepGroupName(), helpers);
+    }
+
+
+    @Override
+    public void start() throws Exception {
+        if (!srvStatus.compareAndSet(0, 1)) {
+            return;
+        }
+        try {
+            if (executorService != null) {
+                executorService.shutdownNow();
+                executorService = null;
+            }
+            executorService = Executors.newSingleThreadExecutor();
+            initEnvConfig();
+            repEnv = getEnvironment();
+            initMetaStore();
+            repEnv.setStateChangeListener(listener);
+            srvStatus.compareAndSet(1, 2);
+        } catch (Throwable ee) {
+            srvStatus.compareAndSet(1, 0);
+            logger.error("[BDB Impl] start StoreManagerService failure, 
error", ee);
+            return;
+        }
+        logger.info("[BDB Impl] start StoreManagerService success");
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!srvStatus.compareAndSet(2, 3)) {
+            return;
+        }
+        logger.info("[BDB Impl] Stopping StoreManagerService...");
+        // close bdb configure
+        brokerConfigMapper.close();
+        topicConfigMapper.close();
+        groupConfigMapper.close();
+        topicCtrlMapper.close();
+        groupBlackListMapper.close();
+        groupFilterCtrlMapper.close();
+        clusterConfigMapper.close();
+        /* evn close */
+        if (repEnv != null) {
+            try {
+                repEnv.close();
+                repEnv = null;
+            } catch (Throwable ee) {
+                logger.error("[BDB Impl] Close repEnv throw error ", ee);
+            }
+        }
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService = null;
+        }
+        srvStatus.set(0);
+        logger.info("[BDB Impl] stopping StoreManagerService successfully...");
+    }
+
+    // cluster default configure api
+    @Override
+    public boolean addClusterConfig(ClusterSettingEntity memEntity, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return clusterConfigMapper.addClusterConfig(memEntity, result);
+    }
+
+    @Override
+    public boolean updClusterConfig(ClusterSettingEntity memEntity, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return clusterConfigMapper.updClusterConfig(memEntity, result);
+    }
+
+    @Override
+    public ClusterSettingEntity getClusterConfig() {
+        return clusterConfigMapper.getClusterConfig();
+    }
+
+    @Override
+    public boolean delClusterConfig(ProcessResult result) {
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return clusterConfigMapper.delClusterConfig();
+    }
+
+    // broker configure api
+    @Override
+    public boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return brokerConfigMapper.addBrokerConf(memEntity, result);
+    }
+
+    @Override
+    public boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return brokerConfigMapper.updBrokerConf(memEntity, result);
+    }
+
+    @Override
+    public boolean delBrokerConf(int brokerId, ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return brokerConfigMapper.delBrokerConf(brokerId);
+    }
+
+    @Override
+    public Map<Integer, BrokerConfEntity> 
getBrokerConfByBrokerId(BrokerConfEntity qryEntity) {
+        return brokerConfigMapper.getBrokerConfByBrokerId(qryEntity);
+    }
+
+    @Override
+    public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
+        return brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+    }
+
+    // topic configure api
+    @Override
+    public boolean addTopicConf(TopicConfEntity entity, ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return topicConfigMapper.addTopicConf(entity, result);
+    }
+
+    @Override
+    public boolean updTopicConf(TopicConfEntity entity, ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return topicConfigMapper.updTopicConf(entity, result);
+    }
+
+    @Override
+    public boolean delTopicConf(String recordKey, ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return topicConfigMapper.delTopicConf(recordKey);
+
+    }
+
+    @Override
+    public List<TopicConfEntity> getTopicConf(TopicConfEntity qryEntity) {
+        return topicConfigMapper.getTopicConf(qryEntity);
+    }
+
+    // topic control api
+    @Override
+    public boolean addTopicCtrlConf(TopicCtrlEntity entity, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return topicCtrlMapper.addTopicCtrlConf(entity, result);
+    }
+
+    @Override
+    public boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return topicCtrlMapper.updTopicCtrlConf(entity, result);
+    }
+
+    @Override
+    public boolean delTopicCtrlConf(String recordKey, ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return topicCtrlMapper.delTopicCtrlConf(recordKey);
+    }
+
+    @Override
+    public TopicCtrlEntity getTopicCtrlConf(String topicName) {
+        return topicCtrlMapper.getTopicCtrlConf(topicName);
+    }
+
+    @Override
+    public List<TopicCtrlEntity> getTopicCtrlConf(TopicConfEntity qryEntity) {
+        return topicCtrlMapper.getTopicCtrlConf(qryEntity);
+    }
+
+    // group configure api
+    @Override
+    public boolean addGroupConf(GroupConfigEntity entity, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupConfigMapper.addGroupConf(entity, result);
+    }
+
+    @Override
+    public boolean updGroupConf(GroupConfigEntity entity, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupConfigMapper.updGroupConf(entity, result);
+    }
+
+    @Override
+    public boolean delGroupConf(String groupName, ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupConfigMapper.delGroupConf(groupName);
+    }
+
+    @Override
+    public GroupConfigEntity getGroupConf(String groupName) {
+        return groupConfigMapper.getGroupConf(groupName);
+    }
+
+    @Override
+    public Map<String, GroupConfigEntity> getGroupConf(GroupConfigEntity 
qryEntity) {
+        return groupConfigMapper.getGroupConf(qryEntity);
+    }
+
+    // group blacklist api
+    @Override
+    public boolean addGroupBlackListConf(GroupBlackListEntity entity, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupBlackListMapper.addGroupBlackListConf(entity, result);
+    }
+
+    @Override
+    public boolean updGroupBlackListConf(GroupBlackListEntity entity, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupBlackListMapper.updGroupBlackListConf(entity, result);
+    }
+
+    @Override
+    public boolean delGroupBlackListConf(String recordKey, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupBlackListMapper.delGroupBlackListConf(recordKey);
+    }
+
+    @Override
+    public boolean delGroupBlackListConfByGroupName(String groupName, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupBlackListMapper.delGroupBlackListConf(groupName);
+    }
+
+    @Override
+    public  List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String 
groupName) {
+        return groupBlackListMapper.getGrpBlkLstConfByGroupName(groupName);
+    }
+
+    @Override
+    public List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String 
topicName) {
+        return groupBlackListMapper.getGrpBlkLstConfByTopicName(topicName);
+    }
+
+    @Override
+    public List<GroupBlackListEntity> 
getGroupBlackListConf(GroupBlackListEntity qryEntity) {
+        return groupBlackListMapper.getGroupBlackListConf(qryEntity);
+    }
+
+    // group filter control api
+    @Override
+    public boolean addGroupFilterCtrlConf(GroupFilterCtrlEntity entity, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupFilterCtrlMapper.addGroupFilterCtrlConf(entity, result);
+    }
+
+    @Override
+    public boolean updGroupFilterCtrlConf(GroupFilterCtrlEntity entity, 
ProcessResult result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupFilterCtrlMapper.updGroupFilterCtrlConf(entity, result);
+    }
+
+    @Override
+    public boolean delGroupFilterCtrlConf(String recordKey, ProcessResult 
result) {
+        // check current status
+        if (!checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        return groupFilterCtrlMapper.delGroupFilterCtrlConf(recordKey);
+    }
+
+    @Override
+    public List<GroupFilterCtrlEntity> getGroupFilterCtrlConf(String 
groupName) {
+        return groupFilterCtrlMapper.getGroupFilterCtrlConf(groupName);
+    }
+
+    @Override
+    public List<GroupFilterCtrlEntity> 
getGroupFilterCtrlConf(GroupFilterCtrlEntity qryEntity) {
+        return groupFilterCtrlMapper.getGroupFilterCtrlConf(qryEntity);
+    }
+
+    @Override
+    public void registerObserver(AliveObserver eventObserver) {
+        if (eventObserver != null) {
+            eventObservers.add(eventObserver);
+        }
+    }
+
+    @Override
+    public boolean isMasterNow() {
+        return isMaster;
+    }
+
+    /**
+     * Get master start time
+     *
+     * @return
+     */
+    @Override
+    public long getMasterSinceTime() {
+        return this.masterSinceTime.get();
+    }
+
+    /**
+     * Check if primary node is active
+     *
+     * @return
+     */
+    @Override
+    public boolean isPrimaryNodeActive() {
+        if (repEnv == null) {
+            return false;
+        }
+        ReplicationMutableConfig tmpConfig = repEnv.getRepMutableConfig();
+        return tmpConfig != null && tmpConfig.getDesignatedPrimary();
+    }
+
+    /**
+     * Transfer master role to other replica node
+     *
+     * @throws Exception
+     */
+    @Override
+    public void transferMaster() throws Exception {
+        if (!isStarted()) {
+            throw new Exception("The BDB store StoreService is reboot now!");
+        }
+        if (isMasterNow()) {
+            if (!isPrimaryNodeActive()) {
+                if ((replicas4Transfer != null) && 
(!replicas4Transfer.isEmpty())) {
+                    logger.info(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                            .append("[BDB Impl] start transferMaster to 
replicas: ")
+                            .append(replicas4Transfer).toString());
+                    repEnv.transferMaster(replicas4Transfer, 5, 
TimeUnit.MINUTES);
+                    logger.info("[BDB Impl] transferMaster end...");
+                } else {
+                    throw new Exception("The replicate nodes is empty!");
+                }
+            } else {
+                throw new Exception("DesignatedPrimary happened...please check 
if the other member is down!");
+            }
+        } else {
+            throw new Exception("Please send your request to the master 
Node!");
+        }
+    }
+
+    /**
+     * Get current master address
+     *
+     * @return
+     */
+    @Override
+    public InetSocketAddress getMasterAddress() {
+        ReplicationGroup replicationGroup = getCurrReplicationGroup();
+        if (replicationGroup == null) {
+            logger.info("[BDB Impl] ReplicationGroup is null...please check 
the group status!");
+            return null;
+        }
+        for (ReplicationNode node : replicationGroup.getNodes()) {
+            try {
+                NodeState nodeState =
+                        replicationGroupAdmin.getNodeState(node, 2000);
+                if (nodeState != null) {
+                    if (nodeState.getNodeState().isMaster()) {
+                        return node.getSocketAddress();
+                    }
+                }
+            } catch (Throwable e) {
+                logger.error("[BDB Impl] Get nodeState Throwable error", e);
+                continue;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Get group address info
+     *
+     * @return
+     */
+    public ClusterGroupVO getGroupAddressStrInfo() {
+        ClusterGroupVO clusterGroupVO = new ClusterGroupVO();
+        clusterGroupVO.setGroupStatus("Abnormal");
+        clusterGroupVO.setGroupName(replicationGroupAdmin.getGroupName());
+        // query current replication group info
+        ReplicationGroup replicationGroup = getCurrReplicationGroup();
+        if (replicationGroup == null) {
+            return clusterGroupVO;
+        }
+        // translate replication group info to ClusterGroupVO structure
+        Tuple2<Boolean, List<ClusterNodeVO>>  transReult =
+                transReplicateNodes(replicationGroup);
+        clusterGroupVO.setNodeData(transReult.getF1());
+        clusterGroupVO.setPrimaryNodeActive(isPrimaryNodeActive());
+        if (transReult.getF0()) {
+            if (isPrimaryNodeActive()) {
+                clusterGroupVO.setGroupStatus("Running-ReadOnly");
+            } else {
+                clusterGroupVO.setGroupStatus("Running-ReadWrite");
+            }
+        }
+        return clusterGroupVO;
+    }
+
+    /**
+     * Get master group status
+     *
+     * @param isFromHeartbeat
+     * @return
+     */
+    @Override
+    public MasterGroupStatus getMasterGroupStatus(boolean isFromHeartbeat) {
+        // #lizard forgives
+        if (repEnv == null) {
+            return null;
+        }
+        ReplicationGroup replicationGroup = null;
+        try {
+            replicationGroup = repEnv.getGroup();
+        } catch (DatabaseException e) {
+            if (e instanceof EnvironmentFailureException) {
+                if (isFromHeartbeat) {
+                    logger.error("[BDB Error] Check found 
EnvironmentFailureException", e);
+                    try {
+                        stop();
+                        start();
+                        replicationGroup = repEnv.getGroup();
+                    } catch (Throwable e1) {
+                        logger.error("[BDB Error] close and reopen 
storeManager error", e1);
+                    }
+                } else {
+                    logger.error(
+                            "[BDB Error] Get EnvironmentFailureException error 
while non heartBeat request", e);
+                }
+            } else {
+                logger.error("[BDB Error] Get replication group info error", 
e);
+            }
+        } catch (Throwable ee) {
+            logger.error("[BDB Error] Get replication group throw error", ee);
+        }
+        if (replicationGroup == null) {
+            logger.error(
+                    "[BDB Error] ReplicationGroup is null...please check the 
status of the group!");
+            return null;
+        }
+        int activeNodes = 0;
+        boolean isMasterActive = false;
+        Set<String> tmp = new HashSet<>();
+        for (ReplicationNode node : replicationGroup.getNodes()) {
+            MasterNodeInfo masterNodeInfo =
+                    new MasterNodeInfo(replicationGroup.getName(),
+                            node.getName(), node.getHostName(), 
node.getPort());
+            try {
+                NodeState nodeState = replicationGroupAdmin.getNodeState(node, 
2000);
+                if (nodeState != null) {
+                    if (nodeState.getNodeState().isActive()) {
+                        activeNodes++;
+                        if (nodeState.getNodeName().equals(masterNodeName)) {
+                            isMasterActive = true;
+                            masterNodeInfo.setNodeStatus(1);
+                        }
+                    }
+                    if (nodeState.getNodeState().isReplica()) {
+                        tmp.add(nodeState.getNodeName());
+                        replicas4Transfer = tmp;
+                        masterNodeInfo.setNodeStatus(0);
+                    }
+                }
+            } catch (IOException e) {
+                connectNodeFailCount++;
+                masterNodeInfo.setNodeStatus(-1);
+                bdbStoreSamplePrint.printExceptionCaught(e, 
node.getHostName(), node.getName());
+                continue;
+            } catch (ServiceDispatcher.ServiceConnectFailedException e) {
+                masterNodeInfo.setNodeStatus(-2);
+                bdbStoreSamplePrint.printExceptionCaught(e, 
node.getHostName(), node.getName());
+                continue;
+            } catch (Throwable ee) {
+                masterNodeInfo.setNodeStatus(-3);
+                bdbStoreSamplePrint.printExceptionCaught(ee, 
node.getHostName(), node.getName());
+                continue;
+            }
+        }
+        MasterGroupStatus masterGroupStatus = new 
MasterGroupStatus(isMasterActive);
+        int groupSize = replicationGroup.getElectableNodes().size();
+        int majoritySize = groupSize / 2 + 1;
+        if ((activeNodes >= majoritySize) && isMasterActive) {
+            masterGroupStatus.setMasterGroupStatus(true, true, true);
+            connectNodeFailCount = 0;
+            if (isPrimaryNodeActive()) {
+                
repEnv.setRepMutableConfig(repEnv.getRepMutableConfig().setDesignatedPrimary(false));
+            }
+        }
+        if (groupSize == 2 && connectNodeFailCount >= 3) {
+            masterGroupStatus.setMasterGroupStatus(true, false, true);
+            if (connectNodeFailCount > 1000) {
+                connectNodeFailCount = 3;
+            }
+            if (!isPrimaryNodeActive()) {
+                logger.error("[BDB Error] DesignatedPrimary happened...please 
check if the other member is down");
+                
repEnv.setRepMutableConfig(repEnv.getRepMutableConfig().setDesignatedPrimary(true));
+            }
+        }
+        return masterGroupStatus;
+    }
+
+    /**
+     * State Change Listener,
+     * through this object, it complete the metadata cache cleaning
+     * and loading of the latest data.
+     *
+     * */
+    public class Listener implements StateChangeListener {
+        @Override
+        public void stateChange(StateChangeEvent stateChangeEvent) throws 
RuntimeException {
+            if (repConfig != null) {
+                logger.warn(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                        .append("[BDB Impl][").append(repConfig.getGroupName())
+                        .append("Receive a group status changed event]... 
stateChangeEventTime: ")
+                        .append(stateChangeEvent.getEventTime()).toString());
+            }
+            doWork(stateChangeEvent);
+        }
+
+        public void doWork(final StateChangeEvent stateChangeEvent) {
+
+            final String currentNode = new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                    .append("GroupName:").append(repConfig.getGroupName())
+                    .append(",nodeName:").append(repConfig.getNodeName())
+                    
.append(",hostName:").append(repConfig.getNodeHostPort()).toString();
+            if (executorService == null) {
+                logger.error("[BDB Impl] found  executorService is null while 
doWork!");
+                return;
+            }
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    ProcessResult result = new ProcessResult();
+                    StringBuilder sBuilder =
+                            new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+                    switch (stateChangeEvent.getState()) {
+                        case MASTER:
+                            if (!isMaster) {
+                                try {
+                                    clearCachedRunData();
+                                    clusterConfigMapper.loadConfig();
+                                    brokerConfigMapper.loadConfig();
+                                    topicConfigMapper.loadConfig();
+                                    topicCtrlMapper.loadConfig();
+                                    groupConfigMapper.loadConfig();
+                                    groupBlackListMapper.loadConfig();
+                                    groupFilterCtrlMapper.loadConfig();
+                                    isMaster = true;
+                                    
masterSinceTime.set(System.currentTimeMillis());
+                                    masterNodeName = 
stateChangeEvent.getMasterNodeName();
+                                    logger.info(sBuilder.append("[BDB Impl] ")
+                                            .append(currentNode)
+                                            .append(" is a 
master.").toString());
+                                } catch (Throwable e) {
+                                    isMaster = false;
+                                    logger.error("[BDB Impl] fatal error when 
Reloading Info ", e);
+                                }
+                            }
+                            break;
+                        case REPLICA:
+                            isMaster = false;
+                            masterNodeName = 
stateChangeEvent.getMasterNodeName();
+                            logger.info(sBuilder.append("[BDB Impl] ")
+                                    .append(currentNode).append(" is a 
slave.").toString());
+                            break;
+                        default:
+                            isMaster = false;
+                            logger.info(sBuilder.append("[BDB Impl] ")
+                                    .append(currentNode).append(" is Unknown 
state ")
+                                    
.append(stateChangeEvent.getState().name()).toString());
+                            break;
+                    }
+                }
+            });
+        }
+    }
+
+    private boolean isStarted() {
+        return (this.srvStatus.get() == 2);
+    }
+
+    /**
+     * clear cached data in alive event observers.
+     *
+     * */
+    private void clearCachedRunData() {
+        for (AliveObserver observer : eventObservers) {
+            observer.clearCacheData();
+        }
+    }
+
+    /**
+     * Initialize configuration for BDB-JE replication environment.
+     *
+     * */
+    private void initEnvConfig() throws InterruptedException {
+
+        // Set envHome and generate a ReplicationConfig. Note that 
ReplicationConfig and
+        // EnvironmentConfig values could all be specified in the 
je.properties file,
+        // as is shown in the properties file included in the example.
+        repConfig = new ReplicationConfig();
+        // Set consistency policy for replica.
+        TimeConsistencyPolicy consistencyPolicy =
+                new TimeConsistencyPolicy(3,
+                        TimeUnit.SECONDS, 3, TimeUnit.SECONDS);
+        repConfig.setConsistencyPolicy(consistencyPolicy);
+        // Wait up to 3 seconds for commitConsumed acknowledgments.
+        repConfig.setReplicaAckTimeout(3, TimeUnit.SECONDS);
+        repConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT, "1000");
+        repConfig.setGroupName(replicationConfig.getRepGroupName());
+        repConfig.setNodeName(replicationConfig.getRepNodeName());
+        repConfig.setNodeHostPort(this.nodeHost + TokenConstants.ATTR_SEP
+                + replicationConfig.getRepNodePort());
+        if (TStringUtils.isNotEmpty(replicationConfig.getRepHelperHost())) {
+            logger.info("[BDB Impl] ADD HELP HOST");
+            repConfig.setHelperHosts(replicationConfig.getRepHelperHost());
+        }
+
+        // A replicated environment must be opened with transactions enabled.
+        // Environments on a master must be read/write, while environments
+        // on a client can be read/write or read/only. Since the master's
+        // identity may change, it's most convenient to open the environment 
in the default
+        // read/write mode. All write operations will be refused on the client 
though.
+        envConfig = new EnvironmentConfig();
+        envConfig.setTransactional(true);
+        Durability durability =
+                new Durability(replicationConfig.getMetaLocalSyncPolicy(),
+                        replicationConfig.getMetaReplicaSyncPolicy(),
+                        replicationConfig.getRepReplicaAckPolicy());
+        envConfig.setDurability(durability);
+        envConfig.setAllowCreate(true);
+        // build envHome file
+        this.envHome = new File(metaDataPath);
+        // An Entity Store in a replicated environment must be transactional.
+        storeConfig.setTransactional(true);
+        // Note that both Master and Replica open the store for write.
+        storeConfig.setReadOnly(false);
+        storeConfig.setAllowCreate(true);
+    }
+
+    /**
+     * Creates the replicated environment handle and returns it. It will retry 
indefinitely if a
+     * master could not be established because a sufficient number of nodes 
were not available, or
+     * there were networking issues, etc.
+     *
+     * @return the newly created replicated environment handle
+     * @throws InterruptedException if the operation was interrupted
+     */
+    private ReplicatedEnvironment getEnvironment() throws InterruptedException 
{
+        DatabaseException exception = null;
+
+        //In this example we retry REP_HANDLE_RETRY_MAX times, but a 
production HA application may
+        //retry indefinitely.
+        for (int i = 0; i < REP_HANDLE_RETRY_MAX; i++) {
+            try {
+                return new ReplicatedEnvironment(envHome, repConfig, 
envConfig);
+            } catch (UnknownMasterException unknownMaster) {
+                exception = unknownMaster;
+                // Indicates there is a group level problem: insufficient 
nodes for an election,
+                // network connectivity issues, etc. Wait and retry to allow 
the problem
+                // to be resolved.
+                logger.error(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                        .append("[BDB Impl] master could not be established. ")
+                        .append("Exception 
message:").append(unknownMaster.getMessage())
+                        .append(" Will retry after 5 seconds.").toString());
+                Thread.sleep(5 * 1000);
+                continue;
+            } catch (InsufficientLogException insufficientLogEx) {
+                logger.error(new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+                        .append("[BDB Impl] [Restoring data please wait....] ")
+                        .append("Obtains logger files for a Replica from other 
members of ")
+                        .append("the replication group. A Replica may need to 
do so if it ")
+                        .append("has been offline for some time, and has 
fallen behind in ")
+                        .append("its execution of the replication 
stream.").toString());
+                NetworkRestore restore = new NetworkRestore();
+                NetworkRestoreConfig config = new NetworkRestoreConfig();
+                // delete obsolete logger files.
+                config.setRetainLogFiles(false);
+                restore.execute(insufficientLogEx, config);
+                // retry
+                return new ReplicatedEnvironment(envHome, repConfig, 
envConfig);
+            }
+        }
+        // Failed despite retries.
+        if (exception != null) {
+            throw exception;
+        }
+        // Don't expect to get here.
+        throw new IllegalStateException("Failed despite retries");
+    }
+
+    /* initial metadata */
+    private void initMetaStore() {
+        clusterConfigMapper = new BdbClusterConfigMapperImpl(repEnv, 
storeConfig);
+        brokerConfigMapper = new BdbBrokerConfigMapperImpl(repEnv, 
storeConfig);
+        topicConfigMapper =  new BdbTopicConfigMapperImpl(repEnv, storeConfig);
+        groupConfigMapper = new BdbGroupConfigMapperImpl(repEnv, storeConfig);
+        topicCtrlMapper = new BdbTopicCtrlMapperImpl(repEnv, storeConfig);
+        groupFilterCtrlMapper = new BdbGroupFilterCtrlMapperImpl(repEnv, 
storeConfig);
+        groupBlackListMapper = new BdbGroupBlackListMapperImpl(repEnv, 
storeConfig);
+    }
+
+    private ReplicationGroup getCurrReplicationGroup() {
+        ReplicationGroup replicationGroup = null;
+        try {
+            replicationGroup = repEnv.getGroup();
+        } catch (Throwable e) {
+            logger.error("[BDB Impl] get current master group info error", e);
+            return null;
+        }
+        return replicationGroup;
+    }
+
+    /**
+     * Query replication group nodes status and translate to ClusterNodeVO type
+     *
+     * @return if has master, replication nodes info
+     * @throws InterruptedException if the operation was interrupted
+     */
+    private Tuple2<Boolean, List<ClusterNodeVO>> transReplicateNodes(
+            ReplicationGroup replicationGroup) {
+        boolean hasMaster = false;
+        List<ClusterNodeVO> clusterNodeVOList = new ArrayList<>();
+        for (ReplicationNode node : replicationGroup.getNodes()) {
+            ClusterNodeVO clusterNodeVO = new ClusterNodeVO();
+            clusterNodeVO.setHostName(node.getHostName());
+            clusterNodeVO.setNodeName(node.getName());
+            clusterNodeVO.setPort(node.getPort());
+            try {
+                NodeState nodeState =
+                        replicationGroupAdmin.getNodeState(node, 2000);
+                if (nodeState != null) {
+                    if (nodeState.getNodeState() == 
ReplicatedEnvironment.State.MASTER) {
+                        hasMaster = true;
+                    }
+                    
clusterNodeVO.setNodeStatus(nodeState.getNodeState().toString());
+                    clusterNodeVO.setJoinTime(nodeState.getJoinTime());
+                } else {
+                    clusterNodeVO.setNodeStatus("Not-found");
+                    clusterNodeVO.setJoinTime(0);
+                }
+            } catch (IOException e) {
+                clusterNodeVO.setNodeStatus("Error");
+                clusterNodeVO.setJoinTime(0);
+            } catch (ServiceDispatcher.ServiceConnectFailedException e) {
+                clusterNodeVO.setNodeStatus("Unconnected");
+                clusterNodeVO.setJoinTime(0);
+            }
+            clusterNodeVOList.add(clusterNodeVO);
+        }
+        return new Tuple2<>(hasMaster, clusterNodeVOList);
+    }
+
+    private boolean checkStoreStatus(boolean checkIsMaster, ProcessResult 
result) {
+        if (!isStarted()) {
+            result.setFailResult(DataOpErrCode.DERR_STORE_STOPPED.getCode(),
+                    "Meta store service stopped!");
+            return result.isSuccess();
+        }
+        if (checkIsMaster && !isMasterNow()) {
+            result.setFailResult(DataOpErrCode.DERR_STORE_NOT_MASTER.getCode(),
+                    "Current node not active!");
+            return result.isSuccess();
+        }
+        result.setSuccResult(null);
+        return true;
+    }
+}

Reply via email to