This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 062533405663dd7faec8be2422390dff8721681c
Author: xiangdong huang <[email protected]>
AuthorDate: Tue Aug 10 22:02:45 2021 +0800

    pass manually test
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  18 +
 .../apache/iotdb/cluster/ClusterIoTDBMBean.java    |   4 +
 .../cluster/client/async/AsyncClientPool.java      |   1 +
 .../cluster/client/sync/SyncClientFactory.java     |   5 +-
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  31 +-
 .../iotdb/cluster/client/sync/SyncDataClient.java  |   9 +-
 .../client/sync/SyncDataHeartbeatClient.java       |  10 +-
 .../iotdb/cluster/client/sync/SyncMetaClient.java  |  11 +-
 .../client/sync/SyncMetaHeartbeatClient.java       |   8 +-
 .../iotdb/cluster/partition/PartitionGroup.java    |   5 +
 .../cluster/partition/slot/SlotPartitionTable.java |   1 -
 .../iotdb/cluster/server/MetaClusterServer2.java   | 372 ---------------------
 .../handlers/caller/AppendNodeEntryHandler.java    |   9 +-
 .../server/handlers/caller/HeartbeatHandler.java   |   7 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |  14 +-
 .../server/heartbeat/MetaHeartbeatThread.java      |   3 +
 .../cluster/server/member/DataGroupMember.java     |  33 +-
 .../member/DataGroupMemberMBean.java}              |   9 +-
 .../cluster/server/member/MetaGroupMember.java     |  95 ++++--
 .../member/MetaGroupMemberMBean.java}              |  19 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  45 ++-
 .../member/RaftMemberMBean.java}                   |  42 ++-
 .../server/service/DataGroupServiceImpls.java      |  26 +-
 .../service/DataGroupServiceImplsMBean.java}       |  15 +-
 .../cluster/server/service/MetaSyncService.java    |   3 +
 25 files changed, 337 insertions(+), 458 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index e72b5f0..a8dc705 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -82,6 +82,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       String.format(
           "%s:%s=%s", "org.apache.iotdb.cluster.service", 
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
 
+  // TODO fix me: better to throw exception if the client can not be get. Then 
we can remove this
+  // field.
+  public static boolean printClientConnectionErrorStack = false;
+
   // establish the cluster as a seed
   private static final String MODE_START = "-s";
   // join an established cluster
@@ -148,6 +152,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     dataGroupEngine = new DataGroupServiceImpls(protocolFactory, 
metaGroupEngine);
     dataClientProvider = new DataClientProvider(protocolFactory);
     initTasks();
+    JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
   }
 
   private void initTasks() {
@@ -268,6 +273,9 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       // start RPC service
       logger.info("start Meta Heartbeat RPC service... ");
       registerManager.register(MetaRaftHeartBeatService.getInstance());
+      // TODO: better to start the Meta RPC service untill the 
heartbeatservice has elected the
+      // leader.
+      // and quorum of followers have caught up.
       logger.info("start Meta RPC service... ");
       registerManager.register(MetaRaftService.getInstance());
 
@@ -576,6 +584,16 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     allowReport = false;
   }
 
+  @Override
+  public void enablePrintClientConnectionErrorStack() {
+    printClientConnectionErrorStack = true;
+  }
+
+  @Override
+  public void disablePrintClientConnectionErrorStack() {
+    printClientConnectionErrorStack = false;
+  }
+
   private static class ClusterIoTDBHolder {
 
     private static final ClusterIoTDB INSTANCE = new ClusterIoTDB();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
index b67debe..88f90ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
@@ -24,4 +24,8 @@ public interface ClusterIoTDBMBean {
   boolean startRaftInfoReport();
 
   void stopRaftInfoReport();
+
+  void enablePrintClientConnectionErrorStack();
+
+  void disablePrintClientConnectionErrorStack();
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 719ab8d..2aa8d04 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.iotdb.db.utils.TestOnly;
+
 import org.apache.thrift.async.TAsyncMethodCall;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
index 6bfe9003..c34f1f3 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.client.sync;
 
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-
 import org.apache.thrift.transport.TTransportException;
 
 import java.io.IOException;
@@ -37,4 +36,8 @@ public interface SyncClientFactory {
    * @throws IOException
    */
   RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws 
TTransportException;
+
+  default String nodeInfo(Node node) {
+    return node.toString();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index c6466f4..38d9942 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -43,10 +44,6 @@ public class SyncClientPool {
   private Map<ClusterNode, Integer> nodeClientNumMap = new 
ConcurrentHashMap<>();
   private SyncClientFactory syncClientFactory;
 
-  // TODO fix me: better to throw exception if the client can not be get. Then 
we can remove this
-  // field.
-  public static boolean printStack = false;
-
   public SyncClientPool(SyncClientFactory syncClientFactory) {
     this.syncClientFactory = syncClientFactory;
     this.waitClientTimeoutMS = 
ClusterDescriptor.getInstance().getConfig().getWaitClientTimeoutMS();
@@ -94,8 +91,11 @@ public class SyncClientPool {
             client = syncClientFactory.getSyncClient(clusterNode, this);
           } catch (TTransportException e) {
             // TODO throw me is better.
-            if (printStack) {
-              logger.error("Cannot open transport for client {}", node, e);
+            if (ClusterIoTDB.printClientConnectionErrorStack) {
+              logger.error(
+                  "Cannot open transport for client {}", 
syncClientFactory.nodeInfo(node), e);
+            } else {
+              logger.error("Cannot open transport for client {}", 
syncClientFactory.nodeInfo(node));
             }
             return null;
           }
@@ -130,10 +130,18 @@ public class SyncClientPool {
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("Interrupted when waiting for an available client of {}", 
clusterNode);
+        logger.warn(
+            "Interrupted when waiting for an available client of {}",
+            syncClientFactory.nodeInfo(clusterNode));
         return null;
       } catch (TTransportException e) {
-        logger.error("Cannot open transport for client {}", clusterNode, e);
+        if (ClusterIoTDB.printClientConnectionErrorStack) {
+          logger.error(
+              "Cannot open transport for client {}", 
syncClientFactory.nodeInfo(clusterNode), e);
+        } else {
+          logger.error(
+              "Cannot open transport for client {}", 
syncClientFactory.nodeInfo(clusterNode));
+        }
         return null;
       }
     }
@@ -159,7 +167,12 @@ public class SyncClientPool {
           clientStack.push(syncClientFactory.getSyncClient(node, this));
           NodeStatusManager.getINSTANCE().activate(node);
         } catch (TTransportException e) {
-          logger.error("Cannot open transport for client {}", node, e);
+          if (ClusterIoTDB.printClientConnectionErrorStack) {
+            logger.error(
+                "Cannot open transport for client {}", 
syncClientFactory.nodeInfo(node), e);
+          } else {
+            logger.error("Cannot open transport for client {}", 
syncClientFactory.nodeInfo(node));
+          }
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> 
oldValue - 1);
           NodeStatusManager.getINSTANCE().deactivate(node);
         }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index fb861df..f886363 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
-
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
@@ -52,6 +51,7 @@ public class SyncDataClient extends Client implements 
Closeable {
 
   public SyncDataClient(TProtocolFactory protocolFactory, Node node, 
SyncClientPool pool)
       throws TTransportException {
+
     // the difference of the two clients lies in the port
     super(
         protocolFactory.getProtocol(
@@ -105,6 +105,13 @@ public class SyncDataClient extends Client implements 
Closeable {
     public SyncDataClient getSyncClient(Node node, SyncClientPool pool) throws 
TTransportException {
       return new SyncDataClient(protocolFactory, node, pool);
     }
+
+    @Override
+    public String nodeInfo(Node node) {
+      return String.format(
+          "DataNode (listenIp = %s, port = %d, id = %d)",
+          node.getInternalIp(), node.getDataPort(), node.getNodeIdentifier());
+    }
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index c9d7644..c2c7f9a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
-
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -64,6 +63,15 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
         throws TTransportException {
       return new SyncDataHeartbeatClient(protocolFactory, node, pool);
     }
+
+    @Override
+    public String nodeInfo(Node node) {
+      return String.format(
+          "DataNode (listenIp = %s, HB port = %d, id = %d)",
+          node.getInternalIp(),
+          node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+          node.getNodeIdentifier());
+    }
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 11df45e..588d0cd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.cluster.client.sync;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
-
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
@@ -88,6 +88,15 @@ public class SyncMetaClient extends Client implements 
Closeable {
     public SyncMetaClient getSyncClient(Node node, SyncClientPool pool) throws 
TTransportException {
       return new SyncMetaClient(protocolFactory, node, pool);
     }
+
+    @Override
+    public String nodeInfo(Node node) {
+      return String.format(
+          "MetaNode (listenIp = %s, HB port = %d, id = %d)",
+          node.getInternalIp(),
+          node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+          node.getNodeIdentifier());
+    }
   }
 
   public Node getNode() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index a21daa1..8c7abec 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
-
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -64,6 +63,13 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
         throws TTransportException {
       return new SyncMetaHeartbeatClient(protocolFactory, node, pool);
     }
+
+    @Override
+    public String nodeInfo(Node node) {
+      return String.format(
+          "MetaNode (listenIp = %s, port = %d, id = %d)",
+          node.getInternalIp(), node.getMetaPort(), node.getNodeIdentifier());
+    }
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 1ce653e..b86726b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -102,4 +102,9 @@ public class PartitionGroup extends ArrayList<Node> {
   public void setId(int id) {
     this.id = id;
   }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", id = " + id;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 7a46baa..61803ef 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.utils.NodeSerializeUtils;
 import org.apache.iotdb.db.utils.SerializeUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java
deleted file mode 100644
index 65db372..0000000
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-// package org.apache.iotdb.cluster.server;
-//
-// import org.apache.iotdb.cluster.config.ClusterDescriptor;
-// import org.apache.iotdb.cluster.coordinator.Coordinator;
-// import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
-// import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-// import org.apache.iotdb.cluster.metadata.CMManager;
-// import org.apache.iotdb.cluster.metadata.MetaPuller;
-// import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
-// import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-// import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-// import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
-// import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
-// import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
-// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
-// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-// import org.apache.iotdb.cluster.rpc.thrift.Node;
-// import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-// import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
-// import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
-// import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
-// import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
-// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
-// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
-// import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
-// import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-// import org.apache.iotdb.cluster.server.service.MetaAsyncService;
-// import org.apache.iotdb.cluster.server.service.MetaSyncService;
-// import org.apache.iotdb.db.exception.StartupException;
-// import org.apache.iotdb.db.exception.query.QueryProcessException;
-// import org.apache.iotdb.db.service.IoTDB;
-// import org.apache.iotdb.db.utils.TestOnly;
-// import org.apache.iotdb.service.rpc.thrift.TSStatus;
-// import org.apache.thrift.TException;
-// import org.apache.thrift.async.AsyncMethodCallback;
-// import org.apache.thrift.transport.TNonblockingServerSocket;
-// import org.slf4j.Logger;
-// import org.slf4j.LoggerFactory;
-//
-// import java.net.InetSocketAddress;
-// import java.nio.ByteBuffer;
-//
-/// **
-// * MetaCluster manages the whole cluster's metadata, such as what nodes are 
in the cluster and the
-// * data partition. Each node has one MetaClusterServer instance, the 
single-node IoTDB instance is
-// * started-up at the same time.
-// */
-// public class MetaClusterServer2 extends RaftServer
-//    implements TSMetaService.AsyncIface, TSMetaService.Iface {
-//  private static Logger logger = 
LoggerFactory.getLogger(MetaClusterServer2.class);
-//
-//  // each node only contains one MetaGroupMember
-//  private MetaGroupMember member;
-//  private Coordinator coordinator;
-//
-//  private MetaAsyncService asyncService;
-//  private MetaSyncService syncService;
-//  private MetaHeartbeatServer metaHeartbeatServer;
-//
-//  public MetaClusterServer2() throws QueryProcessException {
-//    super();
-//    metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
-//    coordinator = new Coordinator();
-//    member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
-//    coordinator.setMetaGroupMember(member);
-//    asyncService = new MetaAsyncService(member);
-//    syncService = new MetaSyncService(member);
-//    MetaPuller.getInstance().init(member);
-//  }
-//
-//  /**
-//   * Besides the standard RaftServer start-up, the IoTDB instance, a 
MetaGroupMember and the
-//   * ClusterMonitor are also started.
-//   *
-//   * @throws TTransportException
-//   * @throws StartupException
-//   */
-//  @Override
-//  public void start() throws TTransportException, StartupException {
-//    super.start();
-//    metaHeartbeatServer.start();
-//
-//    IoTDB.setMetaManager(CMManager.getInstance());
-//    ((CMManager) IoTDB.metaManager).setMetaGroupMember(member);
-//    ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
-//    // TODO FIXME move this out of MetaClusterServer
-//    IoTDB.getInstance().active();
-//
-//    member.start();
-//  }
-//
-//  /** Also stops the IoTDB instance, the MetaGroupMember and the 
ClusterMonitor. */
-//  @Override
-//  public void stop() {
-//
-//    metaHeartbeatServer.stop();
-//    super.stop();
-//    member.stop();
-//  }
-//
-//  /** Build a initial cluster with other nodes on the seed list. */
-//  public void buildCluster() throws ConfigInconsistentException, 
StartUpCheckFailureException {
-//    member.buildCluster();
-//  }
-//
-//  /**
-//   * Pick up a node from the seed list and send a join request to it.
-//   *
-//   * @return whether the node has joined the cluster.
-//   */
-//  public void joinCluster() throws ConfigInconsistentException, 
StartUpCheckFailureException {
-//    member.joinCluster();
-//  }
-//
-//  /**
-//   * MetaClusterServer uses the meta port to create the socket.
-//   *
-//   * @return the TServerTransport
-//   * @throws TTransportException if create the socket fails
-//   */
-//  @Override
-//  TServerTransport getServerSocket() throws TTransportException {
-//    logger.info(
-//        "[{}] Cluster node will listen {}:{}",
-//        getServerClientName(),
-//        config.getInternalIp(),
-//        config.getInternalMetaPort());
-//    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-//      return new TNonblockingServerSocket(
-//          new InetSocketAddress(config.getInternalIp(), 
config.getInternalMetaPort()),
-//          getConnectionTimeoutInMS());
-//    } else {
-//      return new TServerSocket(
-//          new InetSocketAddress(config.getInternalIp(), 
config.getInternalMetaPort()));
-//    }
-//  }
-//
-//  @Override
-//  String getClientThreadPrefix() {
-//    return "MetaClientThread-";
-//  }
-//
-//  @Override
-//  String getServerClientName() {
-//    return "MetaServerThread-";
-//  }
-//
-//  @Override
-//  TProcessor getProcessor() {
-//    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-//      return new AsyncProcessor<>(this);
-//    } else {
-//      return new Processor<>(this);
-//    }
-//  }
-//
-//  // Request forwarding. There is only one MetaGroupMember each node, so all 
requests will be
-//  // directly sent to that member. See the methods in MetaGroupMember for 
details
-//
-//  @Override
-//  public void addNode(Node node, StartUpStatus startUpStatus, 
AsyncMethodCallback resultHandler) {
-//    asyncService.addNode(node, startUpStatus, resultHandler);
-//  }
-//
-//  @Override
-//  public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback 
resultHandler) {
-//    asyncService.sendHeartbeat(request, resultHandler);
-//  }
-//
-//  @Override
-//  public void startElection(ElectionRequest electionRequest, 
AsyncMethodCallback resultHandler) {
-//    asyncService.startElection(electionRequest, resultHandler);
-//  }
-//
-//  @Override
-//  public void appendEntries(AppendEntriesRequest request, 
AsyncMethodCallback resultHandler) {
-//    asyncService.appendEntries(request, resultHandler);
-//  }
-//
-//  @Override
-//  public void appendEntry(AppendEntryRequest request, AsyncMethodCallback 
resultHandler) {
-//    asyncService.appendEntry(request, resultHandler);
-//  }
-//
-//  @Override
-//  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback 
resultHandler) {
-//    asyncService.sendSnapshot(request, resultHandler);
-//  }
-//
-//  @Override
-//  public void executeNonQueryPlan(
-//      ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> 
resultHandler) {
-//    asyncService.executeNonQueryPlan(request, resultHandler);
-//  }
-//
-//  @Override
-//  public void requestCommitIndex(
-//      RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> 
resultHandler) {
-//    asyncService.requestCommitIndex(header, resultHandler);
-//  }
-//
-//  @Override
-//  public void checkAlive(AsyncMethodCallback<Node> resultHandler) {
-//    asyncService.checkAlive(resultHandler);
-//  }
-//
-//  @Override
-//  public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> 
resultHandler) {
-//    asyncService.collectMigrationStatus(resultHandler);
-//  }
-//
-//  @Override
-//  public void readFile(
-//      String filePath, long offset, int length, 
AsyncMethodCallback<ByteBuffer> resultHandler) {
-//    asyncService.readFile(filePath, offset, length, resultHandler);
-//  }
-//
-//  @Override
-//  public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> 
resultHandler) {
-//    asyncService.queryNodeStatus(resultHandler);
-//  }
-//
-//  public MetaGroupMember getMember() {
-//    return member;
-//  }
-//
-//  @Override
-//  public void checkStatus(
-//      StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> 
resultHandler) {
-//    asyncService.checkStatus(startUpStatus, resultHandler);
-//  }
-//
-//  @Override
-//  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) 
{
-//    asyncService.removeNode(node, resultHandler);
-//  }
-//
-//  @Override
-//  public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> 
resultHandler) {
-//    asyncService.exile(removeNodeLog, resultHandler);
-//  }
-//
-//  @Override
-//  public void matchTerm(
-//      long index, long term, RaftNode header, AsyncMethodCallback<Boolean> 
resultHandler) {
-//    asyncService.matchTerm(index, term, header, resultHandler);
-//  }
-//
-//  @Override
-//  public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) 
throws TException {
-//    return syncService.addNode(node, startUpStatus);
-//  }
-//
-//  @Override
-//  public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) {
-//    return syncService.checkStatus(startUpStatus);
-//  }
-//
-//  @Override
-//  public long removeNode(Node node) throws TException {
-//    return syncService.removeNode(node);
-//  }
-//
-//  @Override
-//  public void exile(ByteBuffer removeNodeLog) {
-//    syncService.exile(removeNodeLog);
-//  }
-//
-//  @Override
-//  public TNodeStatus queryNodeStatus() {
-//    return syncService.queryNodeStatus();
-//  }
-//
-//  @Override
-//  public Node checkAlive() {
-//    return syncService.checkAlive();
-//  }
-//
-//  @Override
-//  public ByteBuffer collectMigrationStatus() {
-//    return syncService.collectMigrationStatus();
-//  }
-//
-//  @Override
-//  public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
-//    return syncService.sendHeartbeat(request);
-//  }
-//
-//  @Override
-//  public long startElection(ElectionRequest request) {
-//    return syncService.startElection(request);
-//  }
-//
-//  @Override
-//  public long appendEntries(AppendEntriesRequest request) throws TException {
-//    return syncService.appendEntries(request);
-//  }
-//
-//  @Override
-//  public long appendEntry(AppendEntryRequest request) throws TException {
-//    return syncService.appendEntry(request);
-//  }
-//
-//  @Override
-//  public void sendSnapshot(SendSnapshotRequest request) throws TException {
-//    syncService.sendSnapshot(request);
-//  }
-//
-//  @Override
-//  public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws 
TException {
-//    return syncService.executeNonQueryPlan(request);
-//  }
-//
-//  @Override
-//  public RequestCommitIndexResponse requestCommitIndex(RaftNode header) 
throws TException {
-//    return syncService.requestCommitIndex(header);
-//  }
-//
-//  @Override
-//  public ByteBuffer readFile(String filePath, long offset, int length) 
throws TException {
-//    return syncService.readFile(filePath, offset, length);
-//  }
-//
-//  @Override
-//  public boolean matchTerm(long index, long term, RaftNode header) {
-//    return syncService.matchTerm(index, term, header);
-//  }
-//
-//  @Override
-//  public void removeHardLink(String hardLinkPath) throws TException {
-//    syncService.removeHardLink(hardLinkPath);
-//  }
-//
-//  @Override
-//  public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> 
resultHandler) {
-//    asyncService.removeHardLink(hardLinkPath, resultHandler);
-//  }
-//
-//  @Override
-//  public void handshake(Node sender) {
-//    syncService.handshake(sender);
-//  }
-//
-//  @Override
-//  public void handshake(Node sender, AsyncMethodCallback<Void> 
resultHandler) {
-//    asyncService.handshake(sender, resultHandler);
-//  }
-//
-//  @TestOnly
-//  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
-//    this.member = metaGroupMember;
-//  }
-// }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index fa744d1..6da7370 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +76,8 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<Long> {
       // the request already failed
       return;
     }
-    logger.debug("{}: Append response {} from {}", member.getName(), response, 
receiver);
+    logger.debug(
+        "{}: Append response {} from {} for log {}", member.getName(), 
response, receiver, log);
     if (leaderShipStale.get()) {
       // someone has rejected this log because the leadership is stale
       return;
@@ -106,11 +106,12 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<Long> {
         // the leader ship is stale, wait for the new leader's heartbeat
         long prevReceiverTerm = receiverTerm.get();
         logger.debug(
-            "{}: Received a rejection from {} because term is stale: {}/{}",
+            "{}: Received a rejection from {} because term is stale: {}/{} for 
log {}",
             member.getName(),
             receiver,
             prevReceiverTerm,
-            resp);
+            resp,
+            log);
         if (resp > prevReceiverTerm) {
           receiverTerm.set(resp);
         }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index ba26259..4b4f4b6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
-
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +53,11 @@ public class HeartbeatHandler implements 
AsyncMethodCallback<HeartBeatResponse>
   public void onComplete(HeartBeatResponse resp) {
     long followerTerm = resp.getTerm();
     if (logger.isDebugEnabled()) {
-      logger.debug("{}: Received a heartbeat response {}", memberName, 
followerTerm);
+      logger.debug(
+          "{}: Received a heartbeat response {} for last log index {}",
+          memberName,
+          followerTerm,
+          resp.getLastLogIndex());
     }
     if (followerTerm == RESPONSE_AGREE) {
       // current leadership is still valid
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index bef24c7..a7afd0b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.heartbeat;
 
+import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -32,7 +33,6 @@ import 
org.apache.iotdb.cluster.server.handlers.caller.ElectionHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -230,8 +230,16 @@ public class HeartbeatThread implements Runnable {
                   HeartBeatResponse heartBeatResponse = 
client.sendHeartbeat(req);
                   heartbeatHandler.onComplete(heartBeatResponse);
                 } catch (TTransportException e) {
-                  logger.warn(
-                      "{}: Cannot send heart beat to node {} due to network", 
memberName, node, e);
+                  if (ClusterIoTDB.printClientConnectionErrorStack) {
+                    logger.warn(
+                        "{}: Cannot send heart beat to node {} due to network",
+                        memberName,
+                        node,
+                        e);
+                  } else {
+                    logger.warn(
+                        "{}: Cannot send heart beat to node {} due to 
network", memberName, node);
+                  }
                   client.getInputProtocol().getTransport().close();
                 } catch (Exception e) {
                   logger.warn("{}: Cannot send heart beat to node {}", 
memberName, node, e);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 137330a..de948ac 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -76,6 +76,9 @@ public class MetaHeartbeatThread extends HeartbeatThread {
     super.startElection();
 
     if (localMetaMember.getCharacter() == NodeCharacter.LEADER) {
+      // if the node becomes the leader,
+      localMetaMember.buildMetaEngineServiceIfNotReady();
+
       // A new raft leader needs to have at least one log in its term for 
committing logs with older
       // terms.
       // In the meta group, log frequency is very low. When the leader is 
changed whiling changing
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 6f12dfa..d288b86 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
@@ -88,11 +89,11 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
-
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,7 +122,9 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 
-public class DataGroupMember extends RaftMember {
+public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean {
+
+  private final String mbeanName;
 
   private static final Logger logger = 
LoggerFactory.getLogger(DataGroupMember.class);
 
@@ -167,6 +170,13 @@ public class DataGroupMember extends RaftMember {
   public DataGroupMember(PartitionGroup nodes) {
     // constructor for test
     allNodes = nodes;
+    mbeanName =
+        String.format(
+            "%s:%s=%s%d",
+            "org.apache.iotdb.cluster.service",
+            IoTDBConstant.JMX_TYPE,
+            "DataMember",
+            getRaftGroupId());
     setQueryManager(new ClusterQueryManager());
     localQueryExecutor = new LocalQueryExecutor(this);
     lastAppliedPartitionTableVersion = new 
LastAppliedPatitionTableVersion(getMemberDir());
@@ -177,7 +187,7 @@ public class DataGroupMember extends RaftMember {
         "Data("
             + nodes.getHeader().getNode().getInternalIp()
             + ":"
-            + nodes.getHeader().getNode().getMetaPort()
+            + nodes.getHeader().getNode().getDataPort()
             + ", raftId="
             + nodes.getId()
             + ")",
@@ -188,6 +198,13 @@ public class DataGroupMember extends RaftMember {
         new AsyncClientPool(new SingleManagerFactory(factory)));
     this.metaGroupMember = metaGroupMember;
     allNodes = nodes;
+    mbeanName =
+        String.format(
+            "%s:%s=%s%d",
+            "org.apache.iotdb.cluster.service",
+            IoTDBConstant.JMX_TYPE,
+            "DataMember",
+            getRaftGroupId());
     setQueryManager(new ClusterQueryManager());
     slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), 
getName());
     LogApplier applier = new DataLogApplier(metaGroupMember, this);
@@ -213,6 +230,8 @@ public class DataGroupMember extends RaftMember {
     if (heartBeatService != null) {
       return;
     }
+    logger.info("Starting DataGroupMember {}... RaftGroupID: {}", name, 
getRaftGroupId());
+    JMXService.registerMBean(this, mbeanName);
     super.start();
     heartBeatService.submit(new DataHeartbeatThread(this));
     pullSnapshotService = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@@ -227,7 +246,8 @@ public class DataGroupMember extends RaftMember {
    */
   @Override
   public void stop() {
-    logger.info("{}: stopping...", name);
+    logger.info("Stopping DataGroupMember {}... RaftGroupID: {}", name, 
getRaftGroupId());
+    JMXService.deregisterMBean(mbeanName);
     super.stop();
     if (pullSnapshotService != null) {
       pullSnapshotService.shutdownNow();
@@ -720,6 +740,11 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
+  @Override
+  public String getMBeanName() {
+    return mbeanName;
+  }
+
   private void handleChangeMembershipLogWithoutRaft(Log log) {
     if (log instanceof AddNodeLog) {
       if (!metaGroupMember
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMemberMBean.java
similarity index 73%
copy from cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMemberMBean.java
index b67debe..fffe4ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMemberMBean.java
@@ -16,12 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster;
+package org.apache.iotdb.cluster.server.member;
 
-// we do not inherent IoTDB instance, as it may break the singleton mode of 
IoTDB.
-public interface ClusterIoTDBMBean {
-  /** @return true only if the log degree is DEBUG and the report is enabled */
-  boolean startRaftInfoReport();
+public interface DataGroupMemberMBean extends RaftMemberMBean {
 
-  void stopRaftInfoReport();
+  String getCharacterAsString();
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 53fbfc8..97822c2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.cluster.utils.nodetool.function.Status;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.ShutdownException;
@@ -136,7 +137,12 @@ import static 
org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TI
 import static 
org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
 
 @SuppressWarnings("java:S1135")
-public class MetaGroupMember extends RaftMember implements IService {
+public class MetaGroupMember extends RaftMember implements IService, 
MetaGroupMemberMBean {
+
+  private static final String mbeanName =
+      String.format(
+          "%s:%s=%s",
+          "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, 
"MetaGroupEngine");
 
   /** the file that contains the identifier of this node */
   static final String NODE_IDENTIFIER_FILE_NAME =
@@ -601,6 +607,25 @@ public class MetaGroupMember extends RaftMember implements 
IService {
         response.setRequirePartitionTable(true);
       }
     }
+
+    // if isReady, then it means the node has receives partitionTable from the 
leader, skip the
+    // following logic.
+    if (!ready) {
+      // if the node does not provide necessary info, wait for the next 
heartbeat.
+      if (response.isSetFollowerIdentifier()) {
+        return;
+      }
+      if (response.isSetRequirePartitionTable()) {
+        return;
+      }
+      // if the commitIndex is the same, ok we can start our datagroup service.
+      if (request.getTerm() == term.get()
+          && request.getCommitLogIndex() == 
getLogManager().getCommitLogIndex()) {
+        logger.info("Meta Group is ready");
+        rebuildDataGroups();
+        ready = true;
+      }
+    }
   }
 
   /**
@@ -678,25 +703,7 @@ public class MetaGroupMember extends RaftMember implements 
IService {
       // register the follower, the response.getFollower() contains the node 
information of the
       // receiver.
       registerNodeIdentifier(response.getFollower(), 
response.getFollowerIdentifier());
-      // if all nodes' ids are known, we can build the partition table
-      if (allNodesIdKnown()) {
-        // Notice that this should only be called once.
-
-        // When the meta raft group is established, the follower reports its 
node information to the
-        // leader through the first heartbeat. After the leader knows the node 
information of all
-        // nodes, it can replace the incomplete node information previously 
saved locally, and build
-        // partitionTable to send it to other followers.
-        allNodes = new PartitionGroup(idNodeMap.values());
-        if (partitionTable == null) {
-          partitionTable = new SlotPartitionTable(allNodes, thisNode);
-          logger.info("Partition table is set up");
-        }
-        router = new ClusterPlanRouter(partitionTable);
-        this.coordinator.setRouter(router);
-        rebuildDataGroups();
-        logger.info("The Meta Engine is ready");
-        this.ready = true;
-      }
+      buildMetaEngineServiceIfNotReady();
     }
     // record the requirement of partition table of the follower
     if (response.isRequirePartitionTable()) {
@@ -704,6 +711,29 @@ public class MetaGroupMember extends RaftMember implements 
IService {
     }
   }
 
+  public void buildMetaEngineServiceIfNotReady() {
+    // if all nodes' ids are known, we can build the partition table
+    if (!ready && allNodesIdKnown()) {
+      // Notice that this should only be called once.
+
+      // When the meta raft group is established, the follower reports its 
node information to the
+      // leader through the first heartbeat. After the leader knows the node 
information of all
+      // nodes, it can replace the incomplete node information previously 
saved locally, and build
+      // partitionTable to send it to other followers.
+      allNodes = new PartitionGroup(idNodeMap.values());
+      if (partitionTable == null) {
+        partitionTable = new SlotPartitionTable(allNodes, thisNode);
+        logger.info("Partition table is set up");
+      }
+
+      router = new ClusterPlanRouter(partitionTable);
+      this.coordinator.setRouter(router);
+      rebuildDataGroups();
+      logger.info("The Meta Engine is ready");
+      this.ready = true;
+    }
+  }
+
   /**
    * When a node requires a partition table in its heartbeat response, add it 
into blindNodes so in
    * the next heartbeat the partition table will be sent to the node.
@@ -1370,6 +1400,11 @@ public class MetaGroupMember extends RaftMember 
implements IService {
     return result;
   }
 
+  @Override
+  public String getMBeanName() {
+    return mbeanName;
+  }
+
   /**
    * A non-partitioned plan (like storage group creation) should be executed 
on all metagroup nodes,
    * so the MetaLeader should take the responsible to make sure that every 
node receives the plan.
@@ -1958,4 +1993,24 @@ public class MetaGroupMember extends RaftMember 
implements IService {
   public void handleHandshake(Node sender) {
     NodeStatusManager.getINSTANCE().activate(sender);
   }
+
+  @Override
+  public String getAllNodesAsString() {
+    return getAllNodes().toString();
+  }
+
+  @Override
+  public String getPartitionTableAsString() {
+    return partitionTable.toString();
+  }
+
+  @Override
+  public String getBlindNodesAsString() {
+    return blindNodes.toString();
+  }
+
+  @Override
+  public String getIdNodeMapAsString() {
+    return idNodeMap.toString();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberMBean.java
similarity index 72%
copy from cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberMBean.java
index b67debe..69f2f28 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberMBean.java
@@ -16,12 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster;
+package org.apache.iotdb.cluster.server.member;
 
-// we do not inherent IoTDB instance, as it may break the singleton mode of 
IoTDB.
-public interface ClusterIoTDBMBean {
-  /** @return true only if the log degree is DEBUG and the report is enabled */
-  boolean startRaftInfoReport();
+public interface MetaGroupMemberMBean extends RaftMemberMBean {
 
-  void stopRaftInfoReport();
+  String getPartitionTableAsString();
+
+  boolean isReady();
+
+  String getAllNodesAsString();
+
+  String getCharacterAsString();
+
+  String getBlindNodesAsString();
+
+  String getIdNodeMapAsString();
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 17eab2c..2ee8626 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -62,6 +63,7 @@ import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.metadata.DuplicatedTemplateException;
@@ -78,8 +80,6 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,7 +115,7 @@ import static 
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_T
  * RaftMember process the common raft logic like leader election, log 
appending, catch-up and so on.
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
-public abstract class RaftMember {
+public abstract class RaftMember implements RaftMemberMBean {
   private static final Logger logger = 
LoggerFactory.getLogger(RaftMember.class);
   public static final boolean USE_LOG_DISPATCHER = false;
 
@@ -653,6 +653,10 @@ public abstract class RaftMember {
     return character;
   }
 
+  public String getCharacterAsString() {
+    return character.toString();
+  }
+
   public void setCharacter(NodeCharacter character) {
     if (!Objects.equals(character, this.character)) {
       logger.info("{} has become a {}", name, character);
@@ -804,6 +808,12 @@ public abstract class RaftMember {
     }
   }
 
+  public String getMBeanName() {
+    return String.format(
+        "%s:%s=%s",
+        "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "Engine", 
getRaftGroupId());
+  }
+
   /** call back after syncLeader */
   public interface CheckConsistency {
 
@@ -2074,4 +2084,33 @@ public abstract class RaftMember {
   public void setSkipElection(boolean skipElection) {
     this.skipElection = skipElection;
   }
+
+  public long getLastReportedLogIndex() {
+    return lastReportedLogIndex;
+  }
+
+  @Override
+  public String getAllNodesAsString() {
+    return allNodes.toString();
+  }
+
+  @Override
+  public String getPeerMapAsString() {
+    return peerMap.toString();
+  }
+
+  @Override
+  public String getLeaderAsString() {
+    return leader.get().toString();
+  }
+
+  @Override
+  public String getLogManagerObject() {
+    return getLogManager().toString();
+  }
+
+  @Override
+  public String getLastCatchUpResponseTimeAsString() {
+    return lastCatchUpResponseTime.toString();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMemberMBean.java
similarity index 59%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMemberMBean.java
index 6bfe9003..2a1103a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMemberMBean.java
@@ -16,25 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.iotdb.cluster.client.sync;
+package org.apache.iotdb.cluster.server.member;
 
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
-import org.apache.thrift.transport.TTransportException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public interface RaftMemberMBean {
+
+  String getAllNodesAsString();
+
+  String getName();
+
+  String getPeerMapAsString();
+
+  AtomicLong getTerm();
+
+  String getCharacterAsString();
+
+  String getLeaderAsString();
+
+  Node getVoteFor();
+
+  long getLastHeartbeatReceivedTime();
+
+  String getLogManagerObject();
+
+  boolean isReadOnly();
 
-import java.io.IOException;
+  long getLastReportedLogIndex();
 
-public interface SyncClientFactory {
+  String getLastCatchUpResponseTimeAsString();
 
-  /**
-   * Get a client which will connect the given node and be cached in the given 
pool.
-   *
-   * @param node the cluster node the client will connect.
-   * @param pool the pool that will cache the client for reusing.
-   * @return
-   * @throws IOException
-   */
-  RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws 
TTransportException;
+  boolean isSkipElection();
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index ee440b3..b2fb80c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -41,7 +41,6 @@ import 
org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -61,7 +60,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class DataGroupServiceImpls implements TSDataService.AsyncIface, 
TSDataService.Iface {
+public class DataGroupServiceImpls
+    implements TSDataService.AsyncIface, TSDataService.Iface, 
DataGroupServiceImplsMBean {
 
   private static final Logger logger = 
LoggerFactory.getLogger(DataGroupServiceImpls.class);
 
@@ -678,6 +678,8 @@ public class DataGroupServiceImpls implements 
TSDataService.AsyncIface, TSDataSe
         dataGroupMember.setUnchanged(true);
       } else {
         prevMember.setUnchanged(true);
+        prevMember.start();
+        // TODO do we nedd call other functions in addDataGroupMember() ?
       }
     }
 
@@ -1036,4 +1038,24 @@ public class DataGroupServiceImpls implements 
TSDataService.AsyncIface, TSDataSe
       resultHandler.onError(e);
     }
   }
+
+  @Override
+  public String getHeaderGroupMapAsString() {
+    return headerGroupMap.toString();
+  }
+
+  @Override
+  public int getAsyncServiceMapSize() {
+    return asyncServiceMap.size();
+  }
+
+  @Override
+  public int getSyncServiceMapSize() {
+    return syncServiceMap.size();
+  }
+
+  @Override
+  public String getPartitionTable() {
+    return partitionTable.toString();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImplsMBean.java
similarity index 73%
copy from cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImplsMBean.java
index b67debe..988fc6f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImplsMBean.java
@@ -16,12 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster;
+package org.apache.iotdb.cluster.server.service;
 
-// we do not inherent IoTDB instance, as it may break the singleton mode of 
IoTDB.
-public interface ClusterIoTDBMBean {
-  /** @return true only if the log degree is DEBUG and the report is enabled */
-  boolean startRaftInfoReport();
+public interface DataGroupServiceImplsMBean {
 
-  void stopRaftInfoReport();
+  String getHeaderGroupMapAsString();
+
+  int getAsyncServiceMapSize();
+
+  int getSyncServiceMapSize();
+
+  String getPartitionTable();
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 065b663..7d20e96 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -57,6 +57,7 @@ public class MetaSyncService extends BaseSyncService 
implements TSMetaService.If
     this.metaGroupMember = metaGroupMember;
   }
 
+  // behavior of followers
   @Override
   public long appendEntry(AppendEntryRequest request) throws TException {
     // if the metaGroupMember is not ready (e.g., as a follower the 
PartitionTable is loaded
@@ -70,6 +71,8 @@ public class MetaSyncService extends BaseSyncService 
implements TSMetaService.If
         // this node lacks information of the cluster and refuse to work
         logger.debug("This node is blind to the cluster and cannot accept 
logs, {}", request);
         return Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE;
+      } else {
+        // do nothing because we consider if the partitionTable is loaded, 
then it is corrected.
       }
     }
 

Reply via email to