This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch
Gracefully-exit-Cluster-Nodes-through-stop-script
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/Gracefully-exit-Cluster-Nodes-through-stop-script by this push:
new ea7928d4d3 Finish
ea7928d4d3 is described below
commit ea7928d4d325e567928cc53ca1da3730c5f763a5
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Feb 20 17:03:59 2023 +0800
Finish
---
.../confignode/client/ConfigNodeRequestType.java | 3 +-
.../client/sync/SyncConfigNodeClientPool.java | 2 +
.../iotdb/confignode/manager/ConfigManager.java | 33 +++++++
.../apache/iotdb/confignode/manager/IManager.java | 17 ++++
.../node/heartbeat/ConfigNodeHeartbeatCache.java | 11 ++-
.../node/heartbeat/NodeHeartbeatSample.java | 20 +++-
.../procedure/env/ConfigNodeProcedureEnv.java | 10 +-
.../iotdb/confignode/service/ConfigNode.java | 26 +++---
.../confignode/service/ConfigNodeShutdownHook.java | 96 +++++++++++++++++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 23 ++++-
.../it/cluster/IoTDBClusterNodeShutdownHookIT.java | 103 +++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 9 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 23 +++++
.../java/org/apache/iotdb/db/service/DataNode.java | 2 +-
.../apache/iotdb/db/service/IoTDBShutdownHook.java | 28 +++++-
.../src/main/thrift/confignode.thrift | 16 ++++
16 files changed, 380 insertions(+), 42 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index 648170cabc..e9aa1e93f9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -26,5 +26,6 @@ public enum ConfigNodeRequestType {
RESTART_CONFIG_NODE,
REMOVE_CONFIG_NODE,
DELETE_CONFIG_NODE_PEER,
- STOP_CONFIG_NODE;
+ REPORT_CONFIG_NODE_SHUTDOWN,
+ STOP_CONFIG_NODE
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index cd19d0add8..79a40be913 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -85,6 +85,8 @@ public class SyncConfigNodeClientPool {
return removeConfigNode((TConfigNodeLocation) req, client);
case DELETE_CONFIG_NODE_PEER:
return client.deleteConfigNodePeer((TConfigNodeLocation) req);
+ case REPORT_CONFIG_NODE_SHUTDOWN:
+ return client.reportConfigNodeShutdown((TConfigNodeLocation) req);
case STOP_CONFIG_NODE:
// Only use stopConfigNode when the ConfigNode is removed.
return client.stopConfigNode((TConfigNodeLocation) req);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 327a8f5afc..44a3d26d38 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -364,6 +365,22 @@ public class ConfigManager implements IManager {
return dataSet;
}
+ @Override
+ public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Force updating the target DataNode's status to Unknown
+ getNodeManager()
+ .getNodeCacheMap()
+ .get(dataNodeLocation.getDataNodeId())
+
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+ LOGGER.info(
+ "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as
Unknown",
+ dataNodeLocation.getDataNodeId());
+ }
+ return status;
+ }
+
@Override
public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
{
TSStatus status = confirmLeader();
@@ -1101,6 +1118,22 @@ public class ConfigManager implements IManager {
return status;
}
+ @Override
+ public TSStatus reportConfigNodeShutdown(TConfigNodeLocation
configNodeLocation) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Force updating the target ConfigNode's status to Unknown
+ getNodeManager()
+ .getNodeCacheMap()
+ .get(configNodeLocation.getConfigNodeId())
+
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+ LOGGER.info(
+ "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as
Unknown",
+ configNodeLocation.getConfigNodeId());
+ }
+ return status;
+ }
+
@Override
public TSStatus createFunction(TCreateFunctionReq req) {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index d59791c5ab..b6c8fc4cc7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -222,6 +222,15 @@ public interface IManager {
*/
DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan);
+ /**
+ * Report that the specified DataNode will be shutdown.
+ *
+ * <p>The ConfigNode-leader will mark it as Unknown
+ *
+ * @return SUCCESS_STATUS if reporting successfully
+ */
+ TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation);
+
/**
* DataNode report region migrate result to ConfigNode when remove DataNode
*
@@ -374,6 +383,14 @@ public interface IManager {
*/
TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan);
+ /**
+ * Report that the specified ConfigNode will be shutdown. The
ConfigNode-leader will mark it as
+ * Unknown.
+ *
+ * @return SUCCESS_STATUS if reporting successfully
+ */
+ TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation);
+
TSStatus createFunction(TCreateFunctionReq req);
TSStatus dropFunction(String udfName);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
index 97bc7379c4..17fc9b1eb2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
@@ -53,20 +53,21 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache
{
return;
}
- long lastSendTime = 0;
+ NodeHeartbeatSample lastSample = null;
synchronized (slidingWindow) {
if (!slidingWindow.isEmpty()) {
- lastSendTime = slidingWindow.getLast().getSendTimestamp();
+ lastSample = slidingWindow.getLast();
}
}
+ long lastSendTime = lastSample == null ? 0 : lastSample.getSendTimestamp();
// Update Node status
- NodeStatus status;
+ NodeStatus status = null;
// TODO: Optimize judge logic
if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
status = NodeStatus.Unknown;
- } else {
- status = NodeStatus.Running;
+ } else if (lastSample != null) {
+ status = lastSample.getStatus();
}
/* Update loadScore */
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
index 7510c50348..dceff727bc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
@@ -28,8 +28,8 @@ public class NodeHeartbeatSample {
private final long sendTimestamp;
private final long receiveTimestamp;
- private NodeStatus status;
- private String statusReason;
+ private final NodeStatus status;
+ private final String statusReason;
private TLoadSample loadSample = null;
@@ -37,6 +37,8 @@ public class NodeHeartbeatSample {
public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
this.sendTimestamp = sendTimestamp;
this.receiveTimestamp = receiveTimestamp;
+ this.status = NodeStatus.Running;
+ this.statusReason = null;
}
/** Constructor for DataNode sample */
@@ -75,4 +77,18 @@ public class NodeHeartbeatSample {
public TLoadSample getLoadSample() {
return loadSample;
}
+
+ /**
+ * Generate a default NodeHeartbeatSample.
+ *
+ * <p>i.e. Only contain timestamp and NodeStatus
+ *
+ * @param status The NodeStatus in default NodeSample
+ * @return A NodeHeartbeatSample that only contain timestamp and NodeStatus
+ */
+ public static NodeHeartbeatSample generateDefaultSample(NodeStatus status) {
+ long currentTime = System.currentTimeMillis();
+ return new NodeHeartbeatSample(
+ new THeartbeatResp(currentTime,
status.getStatus()).setStatusReason(null), currentTime);
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e01bb7ebf5..ad8f9bd09e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
@@ -382,16 +381,11 @@ public class ConfigNodeProcedureEnv {
DataNodeRequestType.SET_SYSTEM_STATUS);
}
- // Force updating NodeStatus
- long currentTime = System.currentTimeMillis();
- NodeHeartbeatSample removingSample =
- new NodeHeartbeatSample(
- new THeartbeatResp(currentTime,
NodeStatus.Removing.getStatus()).setStatusReason(null),
- currentTime);
+ // Force updating NodeStatus to Removing
getNodeManager()
.getNodeCacheMap()
.get(dataNodeLocation.getDataNodeId())
- .forceUpdate(removingSample);
+
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 2e8426abcc..184ccd8f18 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -91,6 +92,8 @@ public class ConfigNode implements ConfigNodeMBean {
try {
processPid();
+ // Add shutdown hook
+ Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
// Set up internal services
setUpInternalServices();
// Init ConfigManager
@@ -190,11 +193,7 @@ public class ConfigNode implements ConfigNodeMBean {
}
} catch (StartupException | IOException e) {
LOGGER.error("Meet error while starting up.", e);
- try {
- stop();
- } catch (IOException e2) {
- LOGGER.error("Meet error when stop ConfigNode!", e);
- }
+ stop();
}
}
@@ -210,11 +209,7 @@ public class ConfigNode implements ConfigNodeMBean {
configManager = new ConfigManager();
} catch (IOException e) {
LOGGER.error("Can't start ConfigNode consensus group!", e);
- try {
- stop();
- } catch (IOException e2) {
- LOGGER.error("Meet error when stop ConfigNode!", e);
- }
+ stop();
}
// Add some Metrics for configManager
configManager.addMetrics();
@@ -347,7 +342,8 @@ public class ConfigNode implements ConfigNodeMBean {
registerManager.register(configNodeRPCService);
}
- public void stop() throws IOException {
+ /** Deactivating ConfigNode internal services */
+ public void deactivate() throws IOException {
LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
@@ -355,6 +351,14 @@ public class ConfigNode implements ConfigNodeMBean {
configManager.close();
}
LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
+ }
+
+ public void stop() {
+ try {
+ deactivate();
+ } catch (IOException e) {
+ LOGGER.error("Meet error when deactivate ConfigNode", e);
+ }
System.exit(-1);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
new file mode 100644
index 0000000000..1089830508
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.service;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ConfigNodeShutdownHook extends Thread {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigNodeShutdownHook.class);
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+ private static final int SHUTDOWN_REPORT_RETRY_NUM = 2;
+
+ @Override
+ public void run() {
+ boolean isLeader =
ConfigNode.getInstance().getConfigManager().getConsensusManager().isLeader();
+
+ try {
+ ConfigNode.getInstance().deactivate();
+ } catch (IOException e) {
+ LOGGER.error("Meet error when deactivate ConfigNode", e);
+ }
+
+ if (!isLeader) {
+ // Set and report shutdown to cluster ConfigNode-leader
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Unknown);
+ boolean isReportSuccess = false;
+ TEndPoint targetConfigNode = CONF.getTargetConfigNode();
+ for (int retry = 0; retry < SHUTDOWN_REPORT_RETRY_NUM; retry++) {
+ TSStatus result =
+ (TSStatus)
+ SyncConfigNodeClientPool.getInstance()
+ .sendSyncRequestToConfigNodeWithRetry(
+ targetConfigNode,
+ new TConfigNodeLocation(
+ CONF.getConfigNodeId(),
+ new TEndPoint(CONF.getInternalAddress(),
CONF.getInternalPort()),
+ new TEndPoint(CONF.getInternalAddress(),
CONF.getConsensusPort())),
+ ConfigNodeRequestType.REPORT_CONFIG_NODE_SHUTDOWN);
+
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Report success
+ isReportSuccess = true;
+ break;
+ } else if (result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ // Redirect
+ targetConfigNode = result.getRedirectNode();
+ }
+ }
+ if (!isReportSuccess) {
+ LOGGER.error(
+ "Reporting ConfigNode shutdown failed. The cluster will still take
the current ConfigNode as Running for a few seconds.");
+ }
+ }
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ ConfigNodeConstant.GLOBAL_NAME + " exits. Jvm memory usage: {}",
+ MemUtils.bytesCntToStr(
+ Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
+ }
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 29798f63df..e67e85f758 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -16,10 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@ -165,6 +167,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode
*/
public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Iface {
@@ -247,6 +250,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return resp;
}
+ @Override
+ public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
+ return configManager.reportDataNodeShutdown(dataNodeLocation);
+ }
+
@Override
public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeID) {
GetDataNodeConfigurationPlan queryReq = new
GetDataNodeConfigurationPlan(dataNodeID);
@@ -621,18 +629,23 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
.setMessage("remove ConsensusGroup success.");
}
+ @Override
+ public TSStatus reportConfigNodeShutdown(TConfigNodeLocation
configNodeLocation) {
+ return configManager.reportConfigNodeShutdown(configNodeLocation);
+ }
+
/** stop config node */
@Override
public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) {
new Thread(
() -> {
try {
- ConfigNode.getInstance().stop();
- } catch (IOException e) {
- LOGGER.error("Meet error when stop ConfigNode!", e);
- } finally {
- System.exit(0);
+ // Sleep 1s before stop itself
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
+ ConfigNode.getInstance().stop();
})
.start();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeShutdownHookIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeShutdownHookIT.java
new file mode 100644
index 0000000000..176c6d78ce
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeShutdownHookIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.cluster;
+
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterNodeShutdownHookIT {
+
+ private static final String testConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
+
+ private static final int testConfigNodeNum = 2;
+ private static final int testDataNodeNum = 1;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(testConsensusProtocolClass);
+
+ // Init 2C1D environment
+ EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum,
testDataNodeNum);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testNodeShutdownReporter()
+ throws ClientManagerException, IOException, InterruptedException,
TException {
+ EnvFactory.getEnv().shutdownConfigNode(1);
+ EnvFactory.getEnv().shutdownDataNode(0);
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ // The unknown Nodes should be detected immediately with the help of
shutdown hook
+ boolean isDetected = false;
+ for (int retry = 0; retry < 5; retry++) {
+ TShowClusterResp showClusterResp = client.showCluster();
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showClusterResp.getStatus().getCode());
+ AtomicInteger unknownNum = new AtomicInteger(0);
+ showClusterResp
+ .getNodeStatus()
+ .forEach(
+ (nodeId, nodeStatus) -> {
+ if (NodeStatus.Unknown.getStatus().equals(nodeStatus)) {
+ unknownNum.getAndIncrement();
+ }
+ });
+ if (unknownNum.get() == 2) {
+ isDetected = true;
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.assertTrue(isDetected);
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5fcf500672..c53ef73703 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -315,11 +315,6 @@ public class CommonConfig {
return status;
}
- public void setNodeStatusToShutdown() {
- logger.info("System will reject write operations when shutting down.");
- this.status = NodeStatus.ReadOnly;
- }
-
public void setNodeStatus(NodeStatus newStatus) {
logger.info("Set system mode from {} to {}.", status, newStatus);
this.status = newStatus;
@@ -327,9 +322,7 @@ public class CommonConfig {
switch (newStatus) {
case ReadOnly:
- logger.error(
- "Change system status to ReadOnly! Only query statements are
permitted!",
- new RuntimeException("System mode is set to READ_ONLY"));
+ logger.warn("Change system status to ReadOnly! Only query statements
are permitted!");
break;
case Removing:
logger.info(
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index aa169cd3eb..869a83f592 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.client;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -407,6 +408,22 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation)
throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.reportDataNodeShutdown(dataNodeLocation);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeId)
throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
@@ -926,6 +943,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
throw new TException("DataNode to ConfigNode client doesn't support
removeConsensusGroup.");
}
+ @Override
+ public TSStatus reportConfigNodeShutdown(TConfigNodeLocation
configNodeLocation)
+ throws TException {
+ throw new TException("DataNode to ConfigNode client doesn't support
reportConfigNodeShutdown.");
+ }
+
@Override
public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation)
throws TException {
throw new TException("DataNode to ConfigNode client doesn't support
stopConfigNode.");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 4699ce0f7e..658ff62d09 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -564,7 +564,7 @@ public class DataNode implements DataNodeMBean {
DataNodeMetricsHelper.bind();
}
- private TDataNodeLocation generateDataNodeLocation() {
+ public static TDataNodeLocation generateDataNodeLocation() {
TDataNodeLocation location = new TDataNodeLocation();
location.setDataNodeId(config.getDataNodeId());
location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(),
config.getRpcPort()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index bccc048467..0da3cfb483 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -16,10 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.service;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryChecker;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -29,7 +35,9 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.wal.WALManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +54,7 @@ public class IoTDBShutdownHook extends Thread {
}
// reject write operations to make sure all tsfiles will be sealed
- CommonDescriptor.getInstance().getConfig().setNodeStatusToShutdown();
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
// wait all wal are flushed
WALManager.getInstance().waitAllWALFlushed();
@@ -86,6 +94,24 @@ public class IoTDBShutdownHook extends Thread {
// clear lock file
DirectoryChecker.getInstance().deregisterAll();
+ // Set and report shutdown to cluster ConfigNode-leader
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Unknown);
+ boolean isReportSuccess = false;
+ try (ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId))
{
+ isReportSuccess =
+
client.reportDataNodeShutdown(DataNode.generateDataNodeLocation()).getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ } catch (ClientManagerException e) {
+ logger.error("Failed to borrow ConfigNodeClient", e);
+ } catch (TException e) {
+ logger.error("Failed to report shutdown", e);
+ }
+ if (!isReportSuccess) {
+ logger.error(
+ "Reporting DataNode shutdown failed. The cluster will still take the
current DataNode as Running for a few seconds.");
+ }
+
if (logger.isInfoEnabled()) {
logger.info(
"IoTDB exits. Jvm memory usage: {}",
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index d7890ec803..66593d1371 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -765,6 +765,14 @@ service IConfigNodeRPCService {
*/
TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req)
+ /**
+ * Report that the specified DataNode will be shutdown.
+ * The ConfigNode-leader will mark it as Unknown.
+ *
+ * @return SUCCESS_STATUS if reporting successfully
+ */
+ common.TSStatus reportDataNodeShutdown(common.TDataNodeLocation
dataNodeLocation)
+
/**
* Get one or more DataNodes' configuration
*
@@ -980,6 +988,14 @@ service IConfigNodeRPCService {
*/
common.TSStatus deleteConfigNodePeer(common.TConfigNodeLocation
configNodeLocation)
+ /**
+ * Report that the specified ConfigNode will be shutdown.
+ * The ConfigNode-leader will mark it as Unknown.
+ *
+ * @return SUCCESS_STATUS if reporting successfully
+ */
+ common.TSStatus reportConfigNodeShutdown(common.TConfigNodeLocation
configNodeLocation)
+
/** Stop the specific ConfigNode */
common.TSStatus stopConfigNode(common.TConfigNodeLocation configNodeLocation)