This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch cluster- in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 062533405663dd7faec8be2422390dff8721681c Author: xiangdong huang <[email protected]> AuthorDate: Tue Aug 10 22:02:45 2021 +0800 pass manually test --- .../org/apache/iotdb/cluster/ClusterIoTDB.java | 18 + .../apache/iotdb/cluster/ClusterIoTDBMBean.java | 4 + .../cluster/client/async/AsyncClientPool.java | 1 + .../cluster/client/sync/SyncClientFactory.java | 5 +- .../iotdb/cluster/client/sync/SyncClientPool.java | 31 +- .../iotdb/cluster/client/sync/SyncDataClient.java | 9 +- .../client/sync/SyncDataHeartbeatClient.java | 10 +- .../iotdb/cluster/client/sync/SyncMetaClient.java | 11 +- .../client/sync/SyncMetaHeartbeatClient.java | 8 +- .../iotdb/cluster/partition/PartitionGroup.java | 5 + .../cluster/partition/slot/SlotPartitionTable.java | 1 - .../iotdb/cluster/server/MetaClusterServer2.java | 372 --------------------- .../handlers/caller/AppendNodeEntryHandler.java | 9 +- .../server/handlers/caller/HeartbeatHandler.java | 7 +- .../cluster/server/heartbeat/HeartbeatThread.java | 14 +- .../server/heartbeat/MetaHeartbeatThread.java | 3 + .../cluster/server/member/DataGroupMember.java | 33 +- .../member/DataGroupMemberMBean.java} | 9 +- .../cluster/server/member/MetaGroupMember.java | 95 ++++-- .../member/MetaGroupMemberMBean.java} | 19 +- .../iotdb/cluster/server/member/RaftMember.java | 45 ++- .../member/RaftMemberMBean.java} | 42 ++- .../server/service/DataGroupServiceImpls.java | 26 +- .../service/DataGroupServiceImplsMBean.java} | 15 +- .../cluster/server/service/MetaSyncService.java | 3 + 25 files changed, 337 insertions(+), 458 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java index e72b5f0..a8dc705 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java @@ -82,6 +82,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { String.format( "%s:%s=%s", "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "ClusterIoTDB"); + // TODO fix me: better to throw exception if the client can not be get. Then we can remove this + // field. + public static boolean printClientConnectionErrorStack = false; + // establish the cluster as a seed private static final String MODE_START = "-s"; // join an established cluster @@ -148,6 +152,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { dataGroupEngine = new DataGroupServiceImpls(protocolFactory, metaGroupEngine); dataClientProvider = new DataClientProvider(protocolFactory); initTasks(); + JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName()); } private void initTasks() { @@ -268,6 +273,9 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { // start RPC service logger.info("start Meta Heartbeat RPC service... "); registerManager.register(MetaRaftHeartBeatService.getInstance()); + // TODO: better to start the Meta RPC service untill the heartbeatservice has elected the + // leader. + // and quorum of followers have caught up. logger.info("start Meta RPC service... "); registerManager.register(MetaRaftService.getInstance()); @@ -576,6 +584,16 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { allowReport = false; } + @Override + public void enablePrintClientConnectionErrorStack() { + printClientConnectionErrorStack = true; + } + + @Override + public void disablePrintClientConnectionErrorStack() { + printClientConnectionErrorStack = false; + } + private static class ClusterIoTDBHolder { private static final ClusterIoTDB INSTANCE = new ClusterIoTDB(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java index b67debe..88f90ac 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java @@ -24,4 +24,8 @@ public interface ClusterIoTDBMBean { boolean startRaftInfoReport(); void stopRaftInfoReport(); + + void enablePrintClientConnectionErrorStack(); + + void disablePrintClientConnectionErrorStack(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java index 719ab8d..2aa8d04 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java @@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.server.monitor.NodeStatusManager; import org.apache.iotdb.cluster.utils.ClusterNode; import org.apache.iotdb.db.utils.TestOnly; + import org.apache.thrift.async.TAsyncMethodCall; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java index 6bfe9003..c34f1f3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java @@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.client.sync; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService; - import org.apache.thrift.transport.TTransportException; import java.io.IOException; @@ -37,4 +36,8 @@ public interface SyncClientFactory { * @throws IOException */ RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws TTransportException; + + default String nodeInfo(Node node) { + return node.toString(); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java index c6466f4..38d9942 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.client.sync; +import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; @@ -43,10 +44,6 @@ public class SyncClientPool { private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>(); private SyncClientFactory syncClientFactory; - // TODO fix me: better to throw exception if the client can not be get. Then we can remove this - // field. - public static boolean printStack = false; - public SyncClientPool(SyncClientFactory syncClientFactory) { this.syncClientFactory = syncClientFactory; this.waitClientTimeoutMS = ClusterDescriptor.getInstance().getConfig().getWaitClientTimeoutMS(); @@ -94,8 +91,11 @@ public class SyncClientPool { client = syncClientFactory.getSyncClient(clusterNode, this); } catch (TTransportException e) { // TODO throw me is better. - if (printStack) { - logger.error("Cannot open transport for client {}", node, e); + if (ClusterIoTDB.printClientConnectionErrorStack) { + logger.error( + "Cannot open transport for client {}", syncClientFactory.nodeInfo(node), e); + } else { + logger.error("Cannot open transport for client {}", syncClientFactory.nodeInfo(node)); } return null; } @@ -130,10 +130,18 @@ public class SyncClientPool { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.warn("Interrupted when waiting for an available client of {}", clusterNode); + logger.warn( + "Interrupted when waiting for an available client of {}", + syncClientFactory.nodeInfo(clusterNode)); return null; } catch (TTransportException e) { - logger.error("Cannot open transport for client {}", clusterNode, e); + if (ClusterIoTDB.printClientConnectionErrorStack) { + logger.error( + "Cannot open transport for client {}", syncClientFactory.nodeInfo(clusterNode), e); + } else { + logger.error( + "Cannot open transport for client {}", syncClientFactory.nodeInfo(clusterNode)); + } return null; } } @@ -159,7 +167,12 @@ public class SyncClientPool { clientStack.push(syncClientFactory.getSyncClient(node, this)); NodeStatusManager.getINSTANCE().activate(node); } catch (TTransportException e) { - logger.error("Cannot open transport for client {}", node, e); + if (ClusterIoTDB.printClientConnectionErrorStack) { + logger.error( + "Cannot open transport for client {}", syncClientFactory.nodeInfo(node), e); + } else { + logger.error("Cannot open transport for client {}", syncClientFactory.nodeInfo(node)); + } nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue - 1); NodeStatusManager.getINSTANCE().deactivate(node); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java index fb861df..f886363 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java @@ -26,7 +26,6 @@ import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.TConfigurationConst; import org.apache.iotdb.rpc.TimeoutChangeableTransport; - import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TSocket; @@ -52,6 +51,7 @@ public class SyncDataClient extends Client implements Closeable { public SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool) throws TTransportException { + // the difference of the two clients lies in the port super( protocolFactory.getProtocol( @@ -105,6 +105,13 @@ public class SyncDataClient extends Client implements Closeable { public SyncDataClient getSyncClient(Node node, SyncClientPool pool) throws TTransportException { return new SyncDataClient(protocolFactory, node, pool); } + + @Override + public String nodeInfo(Node node) { + return String.format( + "DataNode (listenIp = %s, port = %d, id = %d)", + node.getInternalIp(), node.getDataPort(), node.getNodeIdentifier()); + } } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java index c9d7644..c2c7f9a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java @@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.TConfigurationConst; - import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; @@ -64,6 +63,15 @@ public class SyncDataHeartbeatClient extends SyncDataClient { throws TTransportException { return new SyncDataHeartbeatClient(protocolFactory, node, pool); } + + @Override + public String nodeInfo(Node node) { + return String.format( + "DataNode (listenIp = %s, HB port = %d, id = %d)", + node.getInternalIp(), + node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET, + node.getNodeIdentifier()); + } } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java index 11df45e..588d0cd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java @@ -22,9 +22,9 @@ package org.apache.iotdb.cluster.client.sync; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client; +import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.TConfigurationConst; - import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TSocket; @@ -88,6 +88,15 @@ public class SyncMetaClient extends Client implements Closeable { public SyncMetaClient getSyncClient(Node node, SyncClientPool pool) throws TTransportException { return new SyncMetaClient(protocolFactory, node, pool); } + + @Override + public String nodeInfo(Node node) { + return String.format( + "MetaNode (listenIp = %s, HB port = %d, id = %d)", + node.getInternalIp(), + node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET, + node.getNodeIdentifier()); + } } public Node getNode() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java index a21daa1..8c7abec 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java @@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.TConfigurationConst; - import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; @@ -64,6 +63,13 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient { throws TTransportException { return new SyncMetaHeartbeatClient(protocolFactory, node, pool); } + + @Override + public String nodeInfo(Node node) { + return String.format( + "MetaNode (listenIp = %s, port = %d, id = %d)", + node.getInternalIp(), node.getMetaPort(), node.getNodeIdentifier()); + } } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java index 1ce653e..b86726b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java @@ -102,4 +102,9 @@ public class PartitionGroup extends ArrayList<Node> { public void setId(int id) { this.id = id; } + + @Override + public String toString() { + return super.toString() + ", id = " + id; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index 7a46baa..61803ef 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -32,7 +32,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.utils.NodeSerializeUtils; import org.apache.iotdb.db.utils.SerializeUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java deleted file mode 100644 index 65db372..0000000 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java +++ /dev/null @@ -1,372 +0,0 @@ -/// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// */ -// package org.apache.iotdb.cluster.server; -// -// import org.apache.iotdb.cluster.config.ClusterDescriptor; -// import org.apache.iotdb.cluster.coordinator.Coordinator; -// import org.apache.iotdb.cluster.exception.ConfigInconsistentException; -// import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; -// import org.apache.iotdb.cluster.metadata.CMManager; -// import org.apache.iotdb.cluster.metadata.MetaPuller; -// import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; -// import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; -// import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; -// import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; -// import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; -// import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; -// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; -// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; -// import org.apache.iotdb.cluster.rpc.thrift.Node; -// import org.apache.iotdb.cluster.rpc.thrift.RaftNode; -// import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; -// import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; -// import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus; -// import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus; -// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; -// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor; -// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor; -// import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer; -// import org.apache.iotdb.cluster.server.member.MetaGroupMember; -// import org.apache.iotdb.cluster.server.service.MetaAsyncService; -// import org.apache.iotdb.cluster.server.service.MetaSyncService; -// import org.apache.iotdb.db.exception.StartupException; -// import org.apache.iotdb.db.exception.query.QueryProcessException; -// import org.apache.iotdb.db.service.IoTDB; -// import org.apache.iotdb.db.utils.TestOnly; -// import org.apache.iotdb.service.rpc.thrift.TSStatus; -// import org.apache.thrift.TException; -// import org.apache.thrift.async.AsyncMethodCallback; -// import org.apache.thrift.transport.TNonblockingServerSocket; -// import org.slf4j.Logger; -// import org.slf4j.LoggerFactory; -// -// import java.net.InetSocketAddress; -// import java.nio.ByteBuffer; -// -/// ** -// * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the -// * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is -// * started-up at the same time. -// */ -// public class MetaClusterServer2 extends RaftServer -// implements TSMetaService.AsyncIface, TSMetaService.Iface { -// private static Logger logger = LoggerFactory.getLogger(MetaClusterServer2.class); -// -// // each node only contains one MetaGroupMember -// private MetaGroupMember member; -// private Coordinator coordinator; -// -// private MetaAsyncService asyncService; -// private MetaSyncService syncService; -// private MetaHeartbeatServer metaHeartbeatServer; -// -// public MetaClusterServer2() throws QueryProcessException { -// super(); -// metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this); -// coordinator = new Coordinator(); -// member = new MetaGroupMember(protocolFactory, thisNode, coordinator); -// coordinator.setMetaGroupMember(member); -// asyncService = new MetaAsyncService(member); -// syncService = new MetaSyncService(member); -// MetaPuller.getInstance().init(member); -// } -// -// /** -// * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the -// * ClusterMonitor are also started. -// * -// * @throws TTransportException -// * @throws StartupException -// */ -// @Override -// public void start() throws TTransportException, StartupException { -// super.start(); -// metaHeartbeatServer.start(); -// -// IoTDB.setMetaManager(CMManager.getInstance()); -// ((CMManager) IoTDB.metaManager).setMetaGroupMember(member); -// ((CMManager) IoTDB.metaManager).setCoordinator(coordinator); -// // TODO FIXME move this out of MetaClusterServer -// IoTDB.getInstance().active(); -// -// member.start(); -// } -// -// /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */ -// @Override -// public void stop() { -// -// metaHeartbeatServer.stop(); -// super.stop(); -// member.stop(); -// } -// -// /** Build a initial cluster with other nodes on the seed list. */ -// public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException { -// member.buildCluster(); -// } -// -// /** -// * Pick up a node from the seed list and send a join request to it. -// * -// * @return whether the node has joined the cluster. -// */ -// public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException { -// member.joinCluster(); -// } -// -// /** -// * MetaClusterServer uses the meta port to create the socket. -// * -// * @return the TServerTransport -// * @throws TTransportException if create the socket fails -// */ -// @Override -// TServerTransport getServerSocket() throws TTransportException { -// logger.info( -// "[{}] Cluster node will listen {}:{}", -// getServerClientName(), -// config.getInternalIp(), -// config.getInternalMetaPort()); -// if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { -// return new TNonblockingServerSocket( -// new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()), -// getConnectionTimeoutInMS()); -// } else { -// return new TServerSocket( -// new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort())); -// } -// } -// -// @Override -// String getClientThreadPrefix() { -// return "MetaClientThread-"; -// } -// -// @Override -// String getServerClientName() { -// return "MetaServerThread-"; -// } -// -// @Override -// TProcessor getProcessor() { -// if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { -// return new AsyncProcessor<>(this); -// } else { -// return new Processor<>(this); -// } -// } -// -// // Request forwarding. There is only one MetaGroupMember each node, so all requests will be -// // directly sent to that member. See the methods in MetaGroupMember for details -// -// @Override -// public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) { -// asyncService.addNode(node, startUpStatus, resultHandler); -// } -// -// @Override -// public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) { -// asyncService.sendHeartbeat(request, resultHandler); -// } -// -// @Override -// public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) { -// asyncService.startElection(electionRequest, resultHandler); -// } -// -// @Override -// public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) { -// asyncService.appendEntries(request, resultHandler); -// } -// -// @Override -// public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) { -// asyncService.appendEntry(request, resultHandler); -// } -// -// @Override -// public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) { -// asyncService.sendSnapshot(request, resultHandler); -// } -// -// @Override -// public void executeNonQueryPlan( -// ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) { -// asyncService.executeNonQueryPlan(request, resultHandler); -// } -// -// @Override -// public void requestCommitIndex( -// RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { -// asyncService.requestCommitIndex(header, resultHandler); -// } -// -// @Override -// public void checkAlive(AsyncMethodCallback<Node> resultHandler) { -// asyncService.checkAlive(resultHandler); -// } -// -// @Override -// public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) { -// asyncService.collectMigrationStatus(resultHandler); -// } -// -// @Override -// public void readFile( -// String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) { -// asyncService.readFile(filePath, offset, length, resultHandler); -// } -// -// @Override -// public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) { -// asyncService.queryNodeStatus(resultHandler); -// } -// -// public MetaGroupMember getMember() { -// return member; -// } -// -// @Override -// public void checkStatus( -// StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) { -// asyncService.checkStatus(startUpStatus, resultHandler); -// } -// -// @Override -// public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) { -// asyncService.removeNode(node, resultHandler); -// } -// -// @Override -// public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) { -// asyncService.exile(removeNodeLog, resultHandler); -// } -// -// @Override -// public void matchTerm( -// long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) { -// asyncService.matchTerm(index, term, header, resultHandler); -// } -// -// @Override -// public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException { -// return syncService.addNode(node, startUpStatus); -// } -// -// @Override -// public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) { -// return syncService.checkStatus(startUpStatus); -// } -// -// @Override -// public long removeNode(Node node) throws TException { -// return syncService.removeNode(node); -// } -// -// @Override -// public void exile(ByteBuffer removeNodeLog) { -// syncService.exile(removeNodeLog); -// } -// -// @Override -// public TNodeStatus queryNodeStatus() { -// return syncService.queryNodeStatus(); -// } -// -// @Override -// public Node checkAlive() { -// return syncService.checkAlive(); -// } -// -// @Override -// public ByteBuffer collectMigrationStatus() { -// return syncService.collectMigrationStatus(); -// } -// -// @Override -// public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) { -// return syncService.sendHeartbeat(request); -// } -// -// @Override -// public long startElection(ElectionRequest request) { -// return syncService.startElection(request); -// } -// -// @Override -// public long appendEntries(AppendEntriesRequest request) throws TException { -// return syncService.appendEntries(request); -// } -// -// @Override -// public long appendEntry(AppendEntryRequest request) throws TException { -// return syncService.appendEntry(request); -// } -// -// @Override -// public void sendSnapshot(SendSnapshotRequest request) throws TException { -// syncService.sendSnapshot(request); -// } -// -// @Override -// public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException { -// return syncService.executeNonQueryPlan(request); -// } -// -// @Override -// public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException { -// return syncService.requestCommitIndex(header); -// } -// -// @Override -// public ByteBuffer readFile(String filePath, long offset, int length) throws TException { -// return syncService.readFile(filePath, offset, length); -// } -// -// @Override -// public boolean matchTerm(long index, long term, RaftNode header) { -// return syncService.matchTerm(index, term, header); -// } -// -// @Override -// public void removeHardLink(String hardLinkPath) throws TException { -// syncService.removeHardLink(hardLinkPath); -// } -// -// @Override -// public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) { -// asyncService.removeHardLink(hardLinkPath, resultHandler); -// } -// -// @Override -// public void handshake(Node sender) { -// syncService.handshake(sender); -// } -// -// @Override -// public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) { -// asyncService.handshake(sender, resultHandler); -// } -// -// @TestOnly -// public void setMetaGroupMember(MetaGroupMember metaGroupMember) { -// this.member = metaGroupMember; -// } -// } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java index fa744d1..6da7370 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java @@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.server.monitor.Peer; import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; - import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +76,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> { // the request already failed return; } - logger.debug("{}: Append response {} from {}", member.getName(), response, receiver); + logger.debug( + "{}: Append response {} from {} for log {}", member.getName(), response, receiver, log); if (leaderShipStale.get()) { // someone has rejected this log because the leadership is stale return; @@ -106,11 +106,12 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> { // the leader ship is stale, wait for the new leader's heartbeat long prevReceiverTerm = receiverTerm.get(); logger.debug( - "{}: Received a rejection from {} because term is stale: {}/{}", + "{}: Received a rejection from {} because term is stale: {}/{} for log {}", member.getName(), receiver, prevReceiverTerm, - resp); + resp, + log); if (resp > prevReceiverTerm) { receiverTerm.set(resp); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java index ba26259..4b4f4b6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java @@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.server.monitor.Peer; - import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,11 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse> public void onComplete(HeartBeatResponse resp) { long followerTerm = resp.getTerm(); if (logger.isDebugEnabled()) { - logger.debug("{}: Received a heartbeat response {}", memberName, followerTerm); + logger.debug( + "{}: Received a heartbeat response {} for last log index {}", + memberName, + followerTerm, + resp.getLastLogIndex()); } if (followerTerm == RESPONSE_AGREE) { // current leadership is still valid diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java index bef24c7..a7afd0b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server.heartbeat; +import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; @@ -32,7 +33,6 @@ import org.apache.iotdb.cluster.server.handlers.caller.ElectionHandler; import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler; import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.utils.ClientUtils; - import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; @@ -230,8 +230,16 @@ public class HeartbeatThread implements Runnable { HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req); heartbeatHandler.onComplete(heartBeatResponse); } catch (TTransportException e) { - logger.warn( - "{}: Cannot send heart beat to node {} due to network", memberName, node, e); + if (ClusterIoTDB.printClientConnectionErrorStack) { + logger.warn( + "{}: Cannot send heart beat to node {} due to network", + memberName, + node, + e); + } else { + logger.warn( + "{}: Cannot send heart beat to node {} due to network", memberName, node); + } client.getInputProtocol().getTransport().close(); } catch (Exception e) { logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java index 137330a..de948ac 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java @@ -76,6 +76,9 @@ public class MetaHeartbeatThread extends HeartbeatThread { super.startElection(); if (localMetaMember.getCharacter() == NodeCharacter.LEADER) { + // if the node becomes the leader, + localMetaMember.buildMetaEngineServiceIfNotReady(); + // A new raft leader needs to have at least one log in its term for committing logs with older // terms. // In the meta group, log frequency is very low. When the leader is changed whiling changing diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 6f12dfa..d288b86 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -74,6 +74,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.cluster.utils.StatusUtils; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter; @@ -88,11 +89,11 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.service.JMXService; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.utils.Pair; - import org.apache.thrift.protocol.TProtocolFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +122,9 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S; -public class DataGroupMember extends RaftMember { +public class DataGroupMember extends RaftMember implements DataGroupMemberMBean { + + private final String mbeanName; private static final Logger logger = LoggerFactory.getLogger(DataGroupMember.class); @@ -167,6 +170,13 @@ public class DataGroupMember extends RaftMember { public DataGroupMember(PartitionGroup nodes) { // constructor for test allNodes = nodes; + mbeanName = + String.format( + "%s:%s=%s%d", + "org.apache.iotdb.cluster.service", + IoTDBConstant.JMX_TYPE, + "DataMember", + getRaftGroupId()); setQueryManager(new ClusterQueryManager()); localQueryExecutor = new LocalQueryExecutor(this); lastAppliedPartitionTableVersion = new LastAppliedPatitionTableVersion(getMemberDir()); @@ -177,7 +187,7 @@ public class DataGroupMember extends RaftMember { "Data(" + nodes.getHeader().getNode().getInternalIp() + ":" - + nodes.getHeader().getNode().getMetaPort() + + nodes.getHeader().getNode().getDataPort() + ", raftId=" + nodes.getId() + ")", @@ -188,6 +198,13 @@ public class DataGroupMember extends RaftMember { new AsyncClientPool(new SingleManagerFactory(factory))); this.metaGroupMember = metaGroupMember; allNodes = nodes; + mbeanName = + String.format( + "%s:%s=%s%d", + "org.apache.iotdb.cluster.service", + IoTDBConstant.JMX_TYPE, + "DataMember", + getRaftGroupId()); setQueryManager(new ClusterQueryManager()); slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), getName()); LogApplier applier = new DataLogApplier(metaGroupMember, this); @@ -213,6 +230,8 @@ public class DataGroupMember extends RaftMember { if (heartBeatService != null) { return; } + logger.info("Starting DataGroupMember {}... RaftGroupID: {}", name, getRaftGroupId()); + JMXService.registerMBean(this, mbeanName); super.start(); heartBeatService.submit(new DataHeartbeatThread(this)); pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); @@ -227,7 +246,8 @@ public class DataGroupMember extends RaftMember { */ @Override public void stop() { - logger.info("{}: stopping...", name); + logger.info("Stopping DataGroupMember {}... RaftGroupID: {}", name, getRaftGroupId()); + JMXService.deregisterMBean(mbeanName); super.stop(); if (pullSnapshotService != null) { pullSnapshotService.shutdownNow(); @@ -720,6 +740,11 @@ public class DataGroupMember extends RaftMember { } } + @Override + public String getMBeanName() { + return mbeanName; + } + private void handleChangeMembershipLogWithoutRaft(Log log) { if (log instanceof AddNodeLog) { if (!metaGroupMember diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMemberMBean.java similarity index 73% copy from cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java copy to cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMemberMBean.java index b67debe..fffe4ab 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMemberMBean.java @@ -16,12 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster; +package org.apache.iotdb.cluster.server.member; -// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB. -public interface ClusterIoTDBMBean { - /** @return true only if the log degree is DEBUG and the report is enabled */ - boolean startRaftInfoReport(); +public interface DataGroupMemberMBean extends RaftMemberMBean { - void stopRaftInfoReport(); + String getCharacterAsString(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 53fbfc8..97822c2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -78,6 +78,7 @@ import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.cluster.utils.PartitionUtils; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.cluster.utils.nodetool.function.Status; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.exception.ShutdownException; @@ -136,7 +137,12 @@ import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TI import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; @SuppressWarnings("java:S1135") -public class MetaGroupMember extends RaftMember implements IService { +public class MetaGroupMember extends RaftMember implements IService, MetaGroupMemberMBean { + + private static final String mbeanName = + String.format( + "%s:%s=%s", + "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "MetaGroupEngine"); /** the file that contains the identifier of this node */ static final String NODE_IDENTIFIER_FILE_NAME = @@ -601,6 +607,25 @@ public class MetaGroupMember extends RaftMember implements IService { response.setRequirePartitionTable(true); } } + + // if isReady, then it means the node has receives partitionTable from the leader, skip the + // following logic. + if (!ready) { + // if the node does not provide necessary info, wait for the next heartbeat. + if (response.isSetFollowerIdentifier()) { + return; + } + if (response.isSetRequirePartitionTable()) { + return; + } + // if the commitIndex is the same, ok we can start our datagroup service. + if (request.getTerm() == term.get() + && request.getCommitLogIndex() == getLogManager().getCommitLogIndex()) { + logger.info("Meta Group is ready"); + rebuildDataGroups(); + ready = true; + } + } } /** @@ -678,25 +703,7 @@ public class MetaGroupMember extends RaftMember implements IService { // register the follower, the response.getFollower() contains the node information of the // receiver. registerNodeIdentifier(response.getFollower(), response.getFollowerIdentifier()); - // if all nodes' ids are known, we can build the partition table - if (allNodesIdKnown()) { - // Notice that this should only be called once. - - // When the meta raft group is established, the follower reports its node information to the - // leader through the first heartbeat. After the leader knows the node information of all - // nodes, it can replace the incomplete node information previously saved locally, and build - // partitionTable to send it to other followers. - allNodes = new PartitionGroup(idNodeMap.values()); - if (partitionTable == null) { - partitionTable = new SlotPartitionTable(allNodes, thisNode); - logger.info("Partition table is set up"); - } - router = new ClusterPlanRouter(partitionTable); - this.coordinator.setRouter(router); - rebuildDataGroups(); - logger.info("The Meta Engine is ready"); - this.ready = true; - } + buildMetaEngineServiceIfNotReady(); } // record the requirement of partition table of the follower if (response.isRequirePartitionTable()) { @@ -704,6 +711,29 @@ public class MetaGroupMember extends RaftMember implements IService { } } + public void buildMetaEngineServiceIfNotReady() { + // if all nodes' ids are known, we can build the partition table + if (!ready && allNodesIdKnown()) { + // Notice that this should only be called once. + + // When the meta raft group is established, the follower reports its node information to the + // leader through the first heartbeat. After the leader knows the node information of all + // nodes, it can replace the incomplete node information previously saved locally, and build + // partitionTable to send it to other followers. + allNodes = new PartitionGroup(idNodeMap.values()); + if (partitionTable == null) { + partitionTable = new SlotPartitionTable(allNodes, thisNode); + logger.info("Partition table is set up"); + } + + router = new ClusterPlanRouter(partitionTable); + this.coordinator.setRouter(router); + rebuildDataGroups(); + logger.info("The Meta Engine is ready"); + this.ready = true; + } + } + /** * When a node requires a partition table in its heartbeat response, add it into blindNodes so in * the next heartbeat the partition table will be sent to the node. @@ -1370,6 +1400,11 @@ public class MetaGroupMember extends RaftMember implements IService { return result; } + @Override + public String getMBeanName() { + return mbeanName; + } + /** * A non-partitioned plan (like storage group creation) should be executed on all metagroup nodes, * so the MetaLeader should take the responsible to make sure that every node receives the plan. @@ -1958,4 +1993,24 @@ public class MetaGroupMember extends RaftMember implements IService { public void handleHandshake(Node sender) { NodeStatusManager.getINSTANCE().activate(sender); } + + @Override + public String getAllNodesAsString() { + return getAllNodes().toString(); + } + + @Override + public String getPartitionTableAsString() { + return partitionTable.toString(); + } + + @Override + public String getBlindNodesAsString() { + return blindNodes.toString(); + } + + @Override + public String getIdNodeMapAsString() { + return idNodeMap.toString(); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberMBean.java similarity index 72% copy from cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java copy to cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberMBean.java index b67debe..69f2f28 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberMBean.java @@ -16,12 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster; +package org.apache.iotdb.cluster.server.member; -// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB. -public interface ClusterIoTDBMBean { - /** @return true only if the log degree is DEBUG and the report is enabled */ - boolean startRaftInfoReport(); +public interface MetaGroupMemberMBean extends RaftMemberMBean { - void stopRaftInfoReport(); + String getPartitionTableAsString(); + + boolean isReady(); + + String getAllNodesAsString(); + + String getCharacterAsString(); + + String getBlindNodesAsString(); + + String getIdNodeMapAsString(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 17eab2c..2ee8626 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server.member; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.client.async.AsyncClientPool; import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor; @@ -62,6 +63,7 @@ import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.cluster.utils.PlanSerializer; import org.apache.iotdb.cluster.utils.StatusUtils; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.IoTDBException; import org.apache.iotdb.db.exception.metadata.DuplicatedTemplateException; @@ -78,8 +80,6 @@ import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,7 +115,7 @@ import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_T * RaftMember process the common raft logic like leader election, log appending, catch-up and so on. */ @SuppressWarnings("java:S3077") // reference volatile is enough -public abstract class RaftMember { +public abstract class RaftMember implements RaftMemberMBean { private static final Logger logger = LoggerFactory.getLogger(RaftMember.class); public static final boolean USE_LOG_DISPATCHER = false; @@ -653,6 +653,10 @@ public abstract class RaftMember { return character; } + public String getCharacterAsString() { + return character.toString(); + } + public void setCharacter(NodeCharacter character) { if (!Objects.equals(character, this.character)) { logger.info("{} has become a {}", name, character); @@ -804,6 +808,12 @@ public abstract class RaftMember { } } + public String getMBeanName() { + return String.format( + "%s:%s=%s", + "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "Engine", getRaftGroupId()); + } + /** call back after syncLeader */ public interface CheckConsistency { @@ -2074,4 +2084,33 @@ public abstract class RaftMember { public void setSkipElection(boolean skipElection) { this.skipElection = skipElection; } + + public long getLastReportedLogIndex() { + return lastReportedLogIndex; + } + + @Override + public String getAllNodesAsString() { + return allNodes.toString(); + } + + @Override + public String getPeerMapAsString() { + return peerMap.toString(); + } + + @Override + public String getLeaderAsString() { + return leader.get().toString(); + } + + @Override + public String getLogManagerObject() { + return getLogManager().toString(); + } + + @Override + public String getLastCatchUpResponseTimeAsString() { + return lastCatchUpResponseTime.toString(); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMemberMBean.java similarity index 59% copy from cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java copy to cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMemberMBean.java index 6bfe9003..2a1103a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMemberMBean.java @@ -16,25 +16,37 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.iotdb.cluster.client.sync; +package org.apache.iotdb.cluster.server.member; import org.apache.iotdb.cluster.rpc.thrift.Node; -import org.apache.iotdb.cluster.rpc.thrift.RaftService; -import org.apache.thrift.transport.TTransportException; +import java.util.concurrent.atomic.AtomicLong; + +public interface RaftMemberMBean { + + String getAllNodesAsString(); + + String getName(); + + String getPeerMapAsString(); + + AtomicLong getTerm(); + + String getCharacterAsString(); + + String getLeaderAsString(); + + Node getVoteFor(); + + long getLastHeartbeatReceivedTime(); + + String getLogManagerObject(); + + boolean isReadOnly(); -import java.io.IOException; + long getLastReportedLogIndex(); -public interface SyncClientFactory { + String getLastCatchUpResponseTimeAsString(); - /** - * Get a client which will connect the given node and be cached in the given pool. - * - * @param node the cluster node the client will connect. - * @param pool the pool that will cache the client for reusing. - * @return - * @throws IOException - */ - RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws TTransportException; + boolean isSkipElection(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java index ee440b3..b2fb80c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java @@ -41,7 +41,6 @@ import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport; import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.service.rpc.thrift.TSStatus; - import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TProtocolFactory; @@ -61,7 +60,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataService.Iface { +public class DataGroupServiceImpls + implements TSDataService.AsyncIface, TSDataService.Iface, DataGroupServiceImplsMBean { private static final Logger logger = LoggerFactory.getLogger(DataGroupServiceImpls.class); @@ -678,6 +678,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe dataGroupMember.setUnchanged(true); } else { prevMember.setUnchanged(true); + prevMember.start(); + // TODO do we nedd call other functions in addDataGroupMember() ? } } @@ -1036,4 +1038,24 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe resultHandler.onError(e); } } + + @Override + public String getHeaderGroupMapAsString() { + return headerGroupMap.toString(); + } + + @Override + public int getAsyncServiceMapSize() { + return asyncServiceMap.size(); + } + + @Override + public int getSyncServiceMapSize() { + return syncServiceMap.size(); + } + + @Override + public String getPartitionTable() { + return partitionTable.toString(); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImplsMBean.java similarity index 73% copy from cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java copy to cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImplsMBean.java index b67debe..988fc6f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImplsMBean.java @@ -16,12 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster; +package org.apache.iotdb.cluster.server.service; -// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB. -public interface ClusterIoTDBMBean { - /** @return true only if the log degree is DEBUG and the report is enabled */ - boolean startRaftInfoReport(); +public interface DataGroupServiceImplsMBean { - void stopRaftInfoReport(); + String getHeaderGroupMapAsString(); + + int getAsyncServiceMapSize(); + + int getSyncServiceMapSize(); + + String getPartitionTable(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java index 065b663..7d20e96 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java @@ -57,6 +57,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If this.metaGroupMember = metaGroupMember; } + // behavior of followers @Override public long appendEntry(AppendEntryRequest request) throws TException { // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded @@ -70,6 +71,8 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If // this node lacks information of the cluster and refuse to work logger.debug("This node is blind to the cluster and cannot accept logs, {}", request); return Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE; + } else { + // do nothing because we consider if the partitionTable is loaded, then it is corrected. } }
