This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch 
Gracefully-exit-Cluster-Nodes-through-stop-script
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/Gracefully-exit-Cluster-Nodes-through-stop-script by this push:
     new ea7928d4d3 Finish
ea7928d4d3 is described below

commit ea7928d4d325e567928cc53ca1da3730c5f763a5
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Feb 20 17:03:59 2023 +0800

    Finish
---
 .../confignode/client/ConfigNodeRequestType.java   |   3 +-
 .../client/sync/SyncConfigNodeClientPool.java      |   2 +
 .../iotdb/confignode/manager/ConfigManager.java    |  33 +++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  17 ++++
 .../node/heartbeat/ConfigNodeHeartbeatCache.java   |  11 ++-
 .../node/heartbeat/NodeHeartbeatSample.java        |  20 +++-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  10 +-
 .../iotdb/confignode/service/ConfigNode.java       |  26 +++---
 .../confignode/service/ConfigNodeShutdownHook.java |  96 +++++++++++++++++++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  23 ++++-
 .../it/cluster/IoTDBClusterNodeShutdownHookIT.java | 103 +++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  23 +++++
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +-
 .../apache/iotdb/db/service/IoTDBShutdownHook.java |  28 +++++-
 .../src/main/thrift/confignode.thrift              |  16 ++++
 16 files changed, 380 insertions(+), 42 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index 648170cabc..e9aa1e93f9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -26,5 +26,6 @@ public enum ConfigNodeRequestType {
   RESTART_CONFIG_NODE,
   REMOVE_CONFIG_NODE,
   DELETE_CONFIG_NODE_PEER,
-  STOP_CONFIG_NODE;
+  REPORT_CONFIG_NODE_SHUTDOWN,
+  STOP_CONFIG_NODE
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index cd19d0add8..79a40be913 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -85,6 +85,8 @@ public class SyncConfigNodeClientPool {
             return removeConfigNode((TConfigNodeLocation) req, client);
           case DELETE_CONFIG_NODE_PEER:
             return client.deleteConfigNodePeer((TConfigNodeLocation) req);
+          case REPORT_CONFIG_NODE_SHUTDOWN:
+            return client.reportConfigNodeShutdown((TConfigNodeLocation) req);
           case STOP_CONFIG_NODE:
             // Only use stopConfigNode when the ConfigNode is removed.
             return client.stopConfigNode((TConfigNodeLocation) req);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 327a8f5afc..44a3d26d38 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -364,6 +365,22 @@ public class ConfigManager implements IManager {
     return dataSet;
   }
 
+  @Override
+  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Force updating the target DataNode's status to Unknown
+      getNodeManager()
+          .getNodeCacheMap()
+          .get(dataNodeLocation.getDataNodeId())
+          
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      LOGGER.info(
+          "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as 
Unknown",
+          dataNodeLocation.getDataNodeId());
+    }
+    return status;
+  }
+
   @Override
   public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) 
{
     TSStatus status = confirmLeader();
@@ -1101,6 +1118,22 @@ public class ConfigManager implements IManager {
     return status;
   }
 
+  @Override
+  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation 
configNodeLocation) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Force updating the target ConfigNode's status to Unknown
+      getNodeManager()
+          .getNodeCacheMap()
+          .get(configNodeLocation.getConfigNodeId())
+          
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      LOGGER.info(
+          "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as 
Unknown",
+          configNodeLocation.getConfigNodeId());
+    }
+    return status;
+  }
+
   @Override
   public TSStatus createFunction(TCreateFunctionReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index d59791c5ab..b6c8fc4cc7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -222,6 +222,15 @@ public interface IManager {
    */
   DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan);
 
+  /**
+   * Report that the specified DataNode will be shutdown.
+   *
+   * <p>The ConfigNode-leader will mark it as Unknown
+   *
+   * @return SUCCESS_STATUS if reporting successfully
+   */
+  TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation);
+
   /**
    * DataNode report region migrate result to ConfigNode when remove DataNode
    *
@@ -374,6 +383,14 @@ public interface IManager {
    */
   TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan);
 
+  /**
+   * Report that the specified ConfigNode will be shutdown. The 
ConfigNode-leader will mark it as
+   * Unknown.
+   *
+   * @return SUCCESS_STATUS if reporting successfully
+   */
+  TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation);
+
   TSStatus createFunction(TCreateFunctionReq req);
 
   TSStatus dropFunction(String udfName);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
index 97bc7379c4..17fc9b1eb2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
@@ -53,20 +53,21 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache 
{
       return;
     }
 
-    long lastSendTime = 0;
+    NodeHeartbeatSample lastSample = null;
     synchronized (slidingWindow) {
       if (!slidingWindow.isEmpty()) {
-        lastSendTime = slidingWindow.getLast().getSendTimestamp();
+        lastSample = slidingWindow.getLast();
       }
     }
+    long lastSendTime = lastSample == null ? 0 : lastSample.getSendTimestamp();
 
     // Update Node status
-    NodeStatus status;
+    NodeStatus status = null;
     // TODO: Optimize judge logic
     if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
       status = NodeStatus.Unknown;
-    } else {
-      status = NodeStatus.Running;
+    } else if (lastSample != null) {
+      status = lastSample.getStatus();
     }
 
     /* Update loadScore */
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
index 7510c50348..dceff727bc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
@@ -28,8 +28,8 @@ public class NodeHeartbeatSample {
   private final long sendTimestamp;
   private final long receiveTimestamp;
 
-  private NodeStatus status;
-  private String statusReason;
+  private final NodeStatus status;
+  private final String statusReason;
 
   private TLoadSample loadSample = null;
 
@@ -37,6 +37,8 @@ public class NodeHeartbeatSample {
   public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
     this.sendTimestamp = sendTimestamp;
     this.receiveTimestamp = receiveTimestamp;
+    this.status = NodeStatus.Running;
+    this.statusReason = null;
   }
 
   /** Constructor for DataNode sample */
@@ -75,4 +77,18 @@ public class NodeHeartbeatSample {
   public TLoadSample getLoadSample() {
     return loadSample;
   }
+
+  /**
+   * Generate a default NodeHeartbeatSample.
+   *
+   * <p>i.e. Only contain timestamp and NodeStatus
+   *
+   * @param status The NodeStatus in default NodeSample
+   * @return A NodeHeartbeatSample that only contain timestamp and NodeStatus
+   */
+  public static NodeHeartbeatSample generateDefaultSample(NodeStatus status) {
+    long currentTime = System.currentTimeMillis();
+    return new NodeHeartbeatSample(
+        new THeartbeatResp(currentTime, 
status.getStatus()).setStatusReason(null), currentTime);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e01bb7ebf5..ad8f9bd09e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
@@ -382,16 +381,11 @@ public class ConfigNodeProcedureEnv {
               DataNodeRequestType.SET_SYSTEM_STATUS);
     }
 
-    // Force updating NodeStatus
-    long currentTime = System.currentTimeMillis();
-    NodeHeartbeatSample removingSample =
-        new NodeHeartbeatSample(
-            new THeartbeatResp(currentTime, 
NodeStatus.Removing.getStatus()).setStatusReason(null),
-            currentTime);
+    // Force updating NodeStatus to Removing
     getNodeManager()
         .getNodeCacheMap()
         .get(dataNodeLocation.getDataNodeId())
-        .forceUpdate(removingSample);
+        
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
   }
 
   /**
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 2e8426abcc..184ccd8f18 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.service;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -91,6 +92,8 @@ public class ConfigNode implements ConfigNodeMBean {
 
     try {
       processPid();
+      // Add shutdown hook
+      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
       // Set up internal services
       setUpInternalServices();
       // Init ConfigManager
@@ -190,11 +193,7 @@ public class ConfigNode implements ConfigNodeMBean {
       }
     } catch (StartupException | IOException e) {
       LOGGER.error("Meet error while starting up.", e);
-      try {
-        stop();
-      } catch (IOException e2) {
-        LOGGER.error("Meet error when stop ConfigNode!", e);
-      }
+      stop();
     }
   }
 
@@ -210,11 +209,7 @@ public class ConfigNode implements ConfigNodeMBean {
       configManager = new ConfigManager();
     } catch (IOException e) {
       LOGGER.error("Can't start ConfigNode consensus group!", e);
-      try {
-        stop();
-      } catch (IOException e2) {
-        LOGGER.error("Meet error when stop ConfigNode!", e);
-      }
+      stop();
     }
     // Add some Metrics for configManager
     configManager.addMetrics();
@@ -347,7 +342,8 @@ public class ConfigNode implements ConfigNodeMBean {
     registerManager.register(configNodeRPCService);
   }
 
-  public void stop() throws IOException {
+  /** Deactivating ConfigNode internal services */
+  public void deactivate() throws IOException {
     LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
@@ -355,6 +351,14 @@ public class ConfigNode implements ConfigNodeMBean {
       configManager.close();
     }
     LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
+  }
+
+  public void stop() {
+    try {
+      deactivate();
+    } catch (IOException e) {
+      LOGGER.error("Meet error when deactivate ConfigNode", e);
+    }
     System.exit(-1);
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
new file mode 100644
index 0000000000..1089830508
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.service;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ConfigNodeShutdownHook extends Thread {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigNodeShutdownHook.class);
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SHUTDOWN_REPORT_RETRY_NUM = 2;
+
+  @Override
+  public void run() {
+    boolean isLeader = 
ConfigNode.getInstance().getConfigManager().getConsensusManager().isLeader();
+
+    try {
+      ConfigNode.getInstance().deactivate();
+    } catch (IOException e) {
+      LOGGER.error("Meet error when deactivate ConfigNode", e);
+    }
+
+    if (!isLeader) {
+      // Set and report shutdown to cluster ConfigNode-leader
+      
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Unknown);
+      boolean isReportSuccess = false;
+      TEndPoint targetConfigNode = CONF.getTargetConfigNode();
+      for (int retry = 0; retry < SHUTDOWN_REPORT_RETRY_NUM; retry++) {
+        TSStatus result =
+            (TSStatus)
+                SyncConfigNodeClientPool.getInstance()
+                    .sendSyncRequestToConfigNodeWithRetry(
+                        targetConfigNode,
+                        new TConfigNodeLocation(
+                            CONF.getConfigNodeId(),
+                            new TEndPoint(CONF.getInternalAddress(), 
CONF.getInternalPort()),
+                            new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort())),
+                        ConfigNodeRequestType.REPORT_CONFIG_NODE_SHUTDOWN);
+
+        if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          // Report success
+          isReportSuccess = true;
+          break;
+        } else if (result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+          // Redirect
+          targetConfigNode = result.getRedirectNode();
+        }
+      }
+      if (!isReportSuccess) {
+        LOGGER.error(
+            "Reporting ConfigNode shutdown failed. The cluster will still take 
the current ConfigNode as Running for a few seconds.");
+      }
+    }
+
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info(
+          ConfigNodeConstant.GLOBAL_NAME + " exits. Jvm memory usage: {}",
+          MemUtils.bytesCntToStr(
+              Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()));
+    }
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 29798f63df..e67e85f758 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -16,10 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.service.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@ -165,6 +167,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode 
*/
 public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Iface {
@@ -247,6 +250,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return resp;
   }
 
+  @Override
+  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
+    return configManager.reportDataNodeShutdown(dataNodeLocation);
+  }
+
   @Override
   public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeID) {
     GetDataNodeConfigurationPlan queryReq = new 
GetDataNodeConfigurationPlan(dataNodeID);
@@ -621,18 +629,23 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
         .setMessage("remove ConsensusGroup success.");
   }
 
+  @Override
+  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation 
configNodeLocation) {
+    return configManager.reportConfigNodeShutdown(configNodeLocation);
+  }
+
   /** stop config node */
   @Override
   public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) {
     new Thread(
             () -> {
               try {
-                ConfigNode.getInstance().stop();
-              } catch (IOException e) {
-                LOGGER.error("Meet error when stop ConfigNode!", e);
-              } finally {
-                System.exit(0);
+                // Sleep 1s before stop itself
+                TimeUnit.SECONDS.sleep(1);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
               }
+              ConfigNode.getInstance().stop();
             })
         .start();
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeShutdownHookIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeShutdownHookIT.java
new file mode 100644
index 0000000000..176c6d78ce
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeShutdownHookIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.cluster;
+
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterNodeShutdownHookIT {
+
+  private static final String testConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
+
+  private static final int testConfigNodeNum = 2;
+  private static final int testDataNodeNum = 1;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setConfigNodeConsensusProtocolClass(testConsensusProtocolClass);
+
+    // Init 2C1D environment
+    EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, 
testDataNodeNum);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testNodeShutdownReporter()
+      throws ClientManagerException, IOException, InterruptedException, 
TException {
+    EnvFactory.getEnv().shutdownConfigNode(1);
+    EnvFactory.getEnv().shutdownDataNode(0);
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      // The unknown Nodes should be detected immediately with the help of 
shutdown hook
+      boolean isDetected = false;
+      for (int retry = 0; retry < 5; retry++) {
+        TShowClusterResp showClusterResp = client.showCluster();
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showClusterResp.getStatus().getCode());
+        AtomicInteger unknownNum = new AtomicInteger(0);
+        showClusterResp
+            .getNodeStatus()
+            .forEach(
+                (nodeId, nodeStatus) -> {
+                  if (NodeStatus.Unknown.getStatus().equals(nodeStatus)) {
+                    unknownNum.getAndIncrement();
+                  }
+                });
+        if (unknownNum.get() == 2) {
+          isDetected = true;
+          break;
+        }
+
+        TimeUnit.SECONDS.sleep(1);
+      }
+      Assert.assertTrue(isDetected);
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5fcf500672..c53ef73703 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -315,11 +315,6 @@ public class CommonConfig {
     return status;
   }
 
-  public void setNodeStatusToShutdown() {
-    logger.info("System will reject write operations when shutting down.");
-    this.status = NodeStatus.ReadOnly;
-  }
-
   public void setNodeStatus(NodeStatus newStatus) {
     logger.info("Set system mode from {} to {}.", status, newStatus);
     this.status = newStatus;
@@ -327,9 +322,7 @@ public class CommonConfig {
 
     switch (newStatus) {
       case ReadOnly:
-        logger.error(
-            "Change system status to ReadOnly! Only query statements are 
permitted!",
-            new RuntimeException("System mode is set to READ_ONLY"));
+        logger.warn("Change system status to ReadOnly! Only query statements 
are permitted!");
         break;
       case Removing:
         logger.info(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index aa169cd3eb..869a83f592 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.client;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -407,6 +408,22 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) 
throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.reportDataNodeShutdown(dataNodeLocation);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeId) 
throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
@@ -926,6 +943,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     throw new TException("DataNode to ConfigNode client doesn't support 
removeConsensusGroup.");
   }
 
+  @Override
+  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation 
configNodeLocation)
+      throws TException {
+    throw new TException("DataNode to ConfigNode client doesn't support 
reportConfigNodeShutdown.");
+  }
+
   @Override
   public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) 
throws TException {
     throw new TException("DataNode to ConfigNode client doesn't support 
stopConfigNode.");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 4699ce0f7e..658ff62d09 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -564,7 +564,7 @@ public class DataNode implements DataNodeMBean {
     DataNodeMetricsHelper.bind();
   }
 
-  private TDataNodeLocation generateDataNodeLocation() {
+  public static TDataNodeLocation generateDataNodeLocation() {
     TDataNodeLocation location = new TDataNodeLocation();
     location.setDataNodeId(config.getDataNodeId());
     location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), 
config.getRpcPort()));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java 
b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index bccc048467..0da3cfb483 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -16,10 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryChecker;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -29,7 +35,9 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.wal.WALManager;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +54,7 @@ public class IoTDBShutdownHook extends Thread {
     }
 
     // reject write operations to make sure all tsfiles will be sealed
-    CommonDescriptor.getInstance().getConfig().setNodeStatusToShutdown();
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
     // wait all wal are flushed
     WALManager.getInstance().waitAllWALFlushed();
 
@@ -86,6 +94,24 @@ public class IoTDBShutdownHook extends Thread {
     // clear lock file
     DirectoryChecker.getInstance().deregisterAll();
 
+    // Set and report shutdown to cluster ConfigNode-leader
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Unknown);
+    boolean isReportSuccess = false;
+    try (ConfigNodeClient client =
+        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId))
 {
+      isReportSuccess =
+          
client.reportDataNodeShutdown(DataNode.generateDataNodeLocation()).getCode()
+              == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+    } catch (ClientManagerException e) {
+      logger.error("Failed to borrow ConfigNodeClient", e);
+    } catch (TException e) {
+      logger.error("Failed to report shutdown", e);
+    }
+    if (!isReportSuccess) {
+      logger.error(
+          "Reporting DataNode shutdown failed. The cluster will still take the 
current DataNode as Running for a few seconds.");
+    }
+
     if (logger.isInfoEnabled()) {
       logger.info(
           "IoTDB exits. Jvm memory usage: {}",
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index d7890ec803..66593d1371 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -765,6 +765,14 @@ service IConfigNodeRPCService {
    */
   TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req)
 
+  /**
+   * Report that the specified DataNode will be shutdown.
+   * The ConfigNode-leader will mark it as Unknown.
+   *
+   * @return SUCCESS_STATUS if reporting successfully
+   */
+  common.TSStatus reportDataNodeShutdown(common.TDataNodeLocation 
dataNodeLocation)
+
   /**
    * Get one or more DataNodes' configuration
    *
@@ -980,6 +988,14 @@ service IConfigNodeRPCService {
    */
   common.TSStatus deleteConfigNodePeer(common.TConfigNodeLocation 
configNodeLocation)
 
+  /**
+   * Report that the specified ConfigNode will be shutdown.
+   * The ConfigNode-leader will mark it as Unknown.
+   *
+   * @return SUCCESS_STATUS if reporting successfully
+   */
+  common.TSStatus reportConfigNodeShutdown(common.TConfigNodeLocation 
configNodeLocation)
+
   /** Stop the specific ConfigNode */
   common.TSStatus stopConfigNode(common.TConfigNodeLocation configNodeLocation)
 

Reply via email to