This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff855e98d0 Fix stuck when stopping a DataNode with large unremovable
WAL files (#15727)
3ff855e98d0 is described below
commit 3ff855e98d06a729531277255a4d4df23ddaea89
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Jun 18 11:39:47 2025 +0800
Fix stuck when stopping a DataNode with large unremovable WAL files (#15727)
* Fix stuck when stopping a DataNode with large unremovable WAL files
* spotless
* add shutdown hook watcher
* Fix logDispatcher stuck
* add re-interrupt
---
.../it/env/cluster/config/MppDataNodeConfig.java | 12 ++
.../it/env/cluster/node/AbstractNodeWrapper.java | 4 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 10 ++
.../apache/iotdb/itbase/env/DataNodeConfig.java | 4 +
.../iotdb/db/it/IoTDBCustomizedClusterIT.java | 148 +++++++++++++++++++++
.../consensus/iot/logdispatcher/LogDispatcher.java | 2 +-
.../consensus/iot/logdispatcher/SyncStatus.java | 5 +-
.../iotdb/db/service/DataNodeShutdownHook.java | 31 ++++-
.../storageengine/dataregion/wal/WALManager.java | 11 +-
9 files changed, 221 insertions(+), 6 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 58ac4d596bc..b7fa3bdee2c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -119,4 +119,16 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
setProperty("cache_last_values_for_load",
String.valueOf(cacheLastValuesForLoad));
return this;
}
+
+ @Override
+ public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+ setProperty("wal_throttle_threshold_in_byte",
String.valueOf(walThrottleSize));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setDeleteWalFilesPeriodInMs(long
deleteWalFilesPeriodInMs) {
+ setProperty("delete_wal_files_period_in_ms",
String.valueOf(deleteWalFilesPeriodInMs));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 3b6d8406981..0e33674fdea 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -818,4 +818,8 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
return -1;
}
}
+
+ public Process getInstance() {
+ return instance;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 63f50d15958..1f61ff4289c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -78,4 +78,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
public DataNodeConfig setCacheLastValuesForLoad(boolean
cacheLastValuesForLoad) {
return this;
}
+
+ @Override
+ public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setDeleteWalFilesPeriodInMs(long
deleteWalFilesPeriodInMs) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 353fdef7f25..4d2e4435bdb 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -45,4 +45,8 @@ public interface DataNodeConfig {
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
+
+ DataNodeConfig setWalThrottleSize(long walThrottleSize);
+
+ DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
new file mode 100644
index 00000000000..5812e255d16
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+
+import static org.junit.Assert.fail;
+
+/** Tests that may not be satisfied with the default cluster settings. */
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBCustomizedClusterIT {
+
+ private final Logger logger =
LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
+
+ /**
+ * When the wal size exceeds `walThrottleSize` * 0.8, the timed
wal-delete-thread will try
+ * deleting wal forever, which will block the DataNode from exiting, because
task of deleting wal
+ * submitted by the ShutdownHook cannot be executed. This test ensures that
this blocking is
+ * fixed.
+ */
+ @Test
+ public void testWalThrottleStuck()
+ throws SQLException,
+ IoTDBConnectionException,
+ StatementExecutionException,
+ InterruptedException {
+ SimpleEnv simpleEnv = new SimpleEnv();
+ simpleEnv
+ .getConfig()
+ .getDataNodeConfig()
+ .setWalThrottleSize(1)
+ .setDeleteWalFilesPeriodInMs(100);
+ simpleEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(3)
+ .setSchemaReplicationFactor(3)
+
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+ try {
+ simpleEnv.initClusterEnvironment(1, 3);
+
+ int leaderIndex = -1;
+ try (Connection connection = simpleEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ // write the first data
+ statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
+ // find the leader of the data region
+ int port = -1;
+
+ ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
+ while (resultSet.next()) {
+ String regionType = resultSet.getString("Type");
+ if (regionType.equals("DataRegion")) {
+ String role = resultSet.getString("Role");
+ if (role.equals("Leader")) {
+ port = resultSet.getInt("RpcPort");
+ break;
+ }
+ }
+ }
+
+ if (port == -1) {
+ fail("Leader not found");
+ }
+
+ for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
+ if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
+ leaderIndex = i;
+ break;
+ }
+ }
+ }
+
+ // stop a follower
+ int followerIndex = (leaderIndex + 1) %
simpleEnv.getDataNodeWrapperList().size();
+ simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
+ System.out.println(
+ new Date()
+ + ":Stopping data node "
+ +
simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
+
+ DataNodeWrapper leader =
simpleEnv.getDataNodeWrapperList().get(leaderIndex);
+ // write to the leader to generate wal that cannot be synced
+ try (Session session = new Session(leader.getIp(), leader.getPort())) {
+ session.open();
+
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ }
+
+ // wait for wal-delete thread to be scheduled
+ Thread.sleep(1000);
+
+ // stop the leader
+ leader.getInstance().destroy();
+ System.out.println(new Date() + ":Stopping data node " +
leader.getIpAndPortString());
+ // confirm the death of the leader
+ long startTime = System.currentTimeMillis();
+ while (leader.isAlive()) {
+ if (System.currentTimeMillis() - startTime > 30000) {
+ fail("Leader does not exit after 30s");
+ }
+ }
+ } finally {
+ simpleEnv.cleanClusterEnvironment();
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c3a0665be6a..e196df43209 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -105,8 +105,8 @@ public class LogDispatcher {
public synchronized void stop() {
if (!threads.isEmpty()) {
threads.forEach(LogDispatcherThread::setStopped);
- threads.forEach(LogDispatcherThread::processStopped);
executorService.shutdownNow();
+ threads.forEach(LogDispatcherThread::processStopped);
int timeout = 10;
try {
if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index fe00939050e..c5a426d88b8 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -44,8 +44,9 @@ public class SyncStatus {
* @throws InterruptedException
*/
public synchronized void addNextBatch(Batch batch) throws
InterruptedException {
- while (pendingBatches.size() >=
config.getReplication().getMaxPendingBatchesNum()
- || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
+ while ((pendingBatches.size() >=
config.getReplication().getMaxPendingBatchesNum()
+ || !iotConsensusMemoryManager.reserve(batch.getMemorySize(),
false))
+ && !Thread.interrupted()) {
wait();
}
pendingBatches.add(batch);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 9fb29e7c612..731c4b09da1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -49,15 +49,43 @@ public class DataNodeShutdownHook extends Thread {
private static final Logger logger =
LoggerFactory.getLogger(DataNodeShutdownHook.class);
private final TDataNodeLocation nodeLocation;
+ private Thread watcherThread;
public DataNodeShutdownHook(TDataNodeLocation nodeLocation) {
super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName());
this.nodeLocation = nodeLocation;
}
+ private void startWatcher() {
+ Thread hookThread = Thread.currentThread();
+ watcherThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ Thread.sleep(10000);
+ StackTraceElement[] stackTrace = hookThread.getStackTrace();
+ StringBuilder stackTraceBuilder =
+ new StringBuilder("Stack trace of shutdown hook:\n");
+ for (StackTraceElement traceElement : stackTrace) {
+
stackTraceBuilder.append(traceElement.toString()).append("\n");
+ }
+ logger.info(stackTraceBuilder.toString());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ },
+ "ShutdownHookWatcher");
+ watcherThread.setDaemon(true);
+ watcherThread.start();
+ }
+
@Override
public void run() {
logger.info("DataNode exiting...");
+ startWatcher();
// Stop external rpc service firstly.
ExternalRPCService.getInstance().stop();
@@ -77,7 +105,6 @@ public class DataNodeShutdownHook extends Thread {
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
StorageEngine.getInstance().syncCloseAllProcessor();
}
- WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
// We did this work because the RatisConsensus recovery mechanism is
different from other
// consensus algorithms, which will replace the underlying storage engine
based on its
@@ -114,6 +141,8 @@ public class DataNodeShutdownHook extends Thread {
"DataNode exits. Jvm memory usage: {}",
MemUtils.bytesCntToStr(
Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
+
+ watcherThread.interrupt();
}
private void triggerSnapshotForAllDataRegion() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index 7d9f7bb4679..300cc10ad81 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -180,7 +180,7 @@ public class WALManager implements IService {
// threshold, the system continues to delete expired files until the disk
size is smaller than
// the threshold.
boolean firstLoop = true;
- while ((firstLoop || shouldThrottle()) && !Thread.interrupted()) {
+ while ((firstLoop || shouldThrottle())) {
deleteOutdatedFilesInWALNodes();
if (firstLoop && shouldThrottle()) {
logger.warn(
@@ -189,6 +189,10 @@ public class WALManager implements IService {
getThrottleThreshold());
}
firstLoop = false;
+ if (Thread.interrupted()) {
+ logger.info("Timed wal delete thread is interrupted.");
+ return;
+ }
}
}
@@ -267,12 +271,15 @@ public class WALManager implements IService {
if (config.getWalMode() == WALMode.DISABLE) {
return;
}
-
+ logger.info("Stopping WALManager");
if (walDeleteThread != null) {
shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
walDeleteThread = null;
}
+ logger.info("Deleting outdated files before exiting");
+ deleteOutdatedFilesInWALNodes();
clear();
+ logger.info("WALManager stopped");
}
private void shutdownThread(ExecutorService thread, ThreadName threadName) {