This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 453bc7b39cd Remove redundant closing of consensus layer in DataNode
shutdown process (#13478)
453bc7b39cd is described below
commit 453bc7b39cdb1daf687b8ddd75be7db8c3af6c42
Author: Li Yu Heng <[email protected]>
AuthorDate: Wed Sep 11 18:14:46 2024 +0800
Remove redundant closing of consensus layer in DataNode shutdown process
(#13478)
---
.../iotdb/db/schemaengine/SchemaEngineMode.java | 3 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 28 ++++---
.../iotdb/db/service/DataNodeShutdownHook.java | 88 +++++++++-------------
.../iotdb/commons/conf/CommonDescriptor.java | 3 -
4 files changed, 51 insertions(+), 71 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngineMode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngineMode.java
index d47dc36089b..43cc8089305 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngineMode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngineMode.java
@@ -21,8 +21,7 @@ package org.apache.iotdb.db.schemaengine;
public enum SchemaEngineMode {
Memory(0),
- PBTree(1),
- Rocksdb_based(2);
+ PBTree(1);
private final int code;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 92852f216e8..2dc24b8f78f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -1146,18 +1146,24 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
}
public void stop() {
- deactivate();
+ stopTriggerRelatedServices();
+ registerManager.deregisterAll();
+ JMXService.deregisterMBean(mbeanName);
SchemaEngine.getInstance().clear();
- try {
- MetricService.getInstance().stop();
- if (schemaRegionConsensusStarted) {
+ MetricService.getInstance().stop();
+ if (schemaRegionConsensusStarted) {
+ try {
SchemaRegionConsensusImpl.getInstance().stop();
+ } catch (Exception e) {
+ logger.warn("Exception during SchemaRegionConsensusImpl stopping", e);
}
- if (dataRegionConsensusStarted) {
+ }
+ if (dataRegionConsensusStarted) {
+ try {
DataRegionConsensusImpl.getInstance().stop();
+ } catch (Exception e) {
+ logger.warn("Exception during DataRegionConsensusImpl stopping", e);
}
- } catch (Exception e) {
- logger.error("Stop data node error", e);
}
}
@@ -1173,14 +1179,6 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
}
}
- private void deactivate() {
- logger.info("Deactivating IoTDB DataNode...");
- stopTriggerRelatedServices();
- registerManager.deregisterAll();
- JMXService.deregisterMBean(mbeanName);
- logger.info("IoTDB DataNode is deactivated.");
- }
-
private void stopTriggerRelatedServices() {
triggerInformationUpdater.stopTriggerInformationUpdater();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index b060634982c..96eda557489 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -28,12 +28,9 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.db.schemaengine.SchemaEngine;
-import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.rescon.disk.DirectoryChecker;
@@ -44,8 +41,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class DataNodeShutdownHook extends Thread {
private static final Logger logger =
LoggerFactory.getLogger(DataNodeShutdownHook.class);
@@ -63,12 +58,6 @@ public class DataNodeShutdownHook extends Thread {
// Stop external rpc service firstly.
RPCService.getInstance().stop();
- // Close rocksdb if possible to avoid lose data
- if
(SchemaEngineMode.valueOf(CommonDescriptor.getInstance().getConfig().getSchemaEngineMode())
- .equals(SchemaEngineMode.Rocksdb_based)) {
- SchemaEngine.getInstance().clear();
- }
-
// Reject write operations to make sure all tsfiles will be sealed
CommonDescriptor.getInstance().getConfig().setStopping(true);
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
@@ -93,51 +82,20 @@ public class DataNodeShutdownHook extends Thread {
.getConfig()
.getDataRegionConsensusProtocolClass()
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
-
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIds().parallelStream()
- .forEach(
- id -> {
- try {
- DataRegionConsensusImpl.getInstance().triggerSnapshot(id,
false);
- } catch (ConsensusException e) {
- logger.warn(
- "Something wrong happened while calling consensus
layer's "
- + "triggerSnapshot API.",
- e);
- }
- });
- }
-
- // Close consensusImpl
- try {
- SchemaRegionConsensusImpl.getInstance().stop();
- DataRegionConsensusImpl.getInstance().stop();
- } catch (IOException e) {
- logger.error("Stop ConsensusImpl error in IoTDBShutdownHook", e);
+ triggerSnapshotForAllDataRegion();
}
// Set and report shutdown to cluster ConfigNode-leader
-
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Unknown);
- boolean isReportSuccess = false;
- try (ConfigNodeClient client =
-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- isReportSuccess =
- client.reportDataNodeShutdown(nodeLocation).getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-
- // Actually stop all services started by the DataNode.
- // If we don't call this, services like the RestService are not stopped
and I can't re-start
- // it.
- DataNode.getInstance().stop();
- } catch (ClientManagerException e) {
- logger.error("Failed to borrow ConfigNodeClient", e);
- } catch (TException e) {
- logger.error("Failed to report shutdown", e);
- }
- if (!isReportSuccess) {
- logger.error(
- "Reporting DataNode shutdown failed. The cluster will still take the
current DataNode as Running for a few seconds.");
+ if (!reportShutdownToConfigNodeLeader()) {
+ logger.warn(
+ "Failed to report DataNode's shutdown to ConfigNode. The cluster
will still take the current DataNode as Running for a few seconds.");
}
+ // Actually stop all services started by the DataNode.
+ // If we don't call this, services like the RestService are not stopped
and I can't re-start
+ // it.
+ DataNode.getInstance().stop();
+
// Clear lock file. All services should be shutdown before this line.
DirectoryChecker.getInstance().deregisterAll();
@@ -148,4 +106,32 @@ public class DataNodeShutdownHook extends Thread {
Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
}
}
+
+ private void triggerSnapshotForAllDataRegion() {
+
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIds().parallelStream()
+ .forEach(
+ id -> {
+ try {
+ DataRegionConsensusImpl.getInstance().triggerSnapshot(id,
false);
+ } catch (ConsensusException e) {
+ logger.warn(
+ "Something wrong happened while calling consensus layer's "
+ + "triggerSnapshot API.",
+ e);
+ }
+ });
+ }
+
+ private boolean reportShutdownToConfigNodeLeader() {
+ try (ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ return client.reportDataNodeShutdown(nodeLocation).getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ } catch (ClientManagerException e) {
+ logger.error("Failed to borrow ConfigNodeClient", e);
+ } catch (TException e) {
+ logger.error("Failed to report shutdown", e);
+ }
+ return false;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 310400b33ce..48f7aad2391 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -213,9 +213,6 @@ public class CommonDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"enable_last_cache",
Boolean.toString(config.isLastCacheEnable()))));
- if (config.getSchemaEngineMode().equals("Rocksdb_based")) {
- config.setLastCacheEnable(false);
- }
config.setTagAttributeTotalSize(
Integer.parseInt(