This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cp_iot_1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c5127e7bf81dd2bb2e6cdb518dddecf8a17bec27 Author: Jiang Tian <[email protected]> AuthorDate: Wed Jun 18 11:39:47 2025 +0800 Fix stuck when stopping a DataNode with large unremovable WAL files (#15727) * Fix stuck when stopping a DataNode with large unremovable WAL files * spotless * add shutdown hook watcher * Fix logDispatcher stuck * add re-interrupt --- .../it/env/cluster/config/MppDataNodeConfig.java | 12 ++ .../it/env/cluster/node/AbstractNodeWrapper.java | 4 + .../it/env/remote/config/RemoteDataNodeConfig.java | 10 ++ .../apache/iotdb/itbase/env/DataNodeConfig.java | 4 + .../iotdb/db/it/IoTDBCustomizedClusterIT.java | 148 +++++++++++++++++++++ .../consensus/iot/logdispatcher/LogDispatcher.java | 2 +- .../consensus/iot/logdispatcher/SyncStatus.java | 5 +- .../iotdb/db/service/DataNodeShutdownHook.java | 31 ++++- .../storageengine/dataregion/wal/WALManager.java | 9 +- 9 files changed, 220 insertions(+), 5 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 01636b7bf0b..9ac380503cd 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -100,4 +100,16 @@ public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad)); return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize)); + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index f1e25f9e1d6..005db9050ff 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -758,4 +758,8 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { return -1; } } + + public Process getInstance() { + return instance; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 80d9d98d23d..b2550ae689f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -63,4 +63,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig { public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index c6112a0e639..c27eb369c0d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -39,4 +39,8 @@ public interface DataNodeConfig { DataNodeConfig setLoadLastCacheStrategy(String strategyName); DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad); + + DataNodeConfig setWalThrottleSize(long walThrottleSize); + + DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java new file mode 100644 index 00000000000..5812e255d16 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Date; + +import static org.junit.Assert.fail; + +/** Tests that may not be satisfied with the default cluster settings. */ +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBCustomizedClusterIT { + + private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); + + /** + * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try + * deleting wal forever, which will block the DataNode from exiting, because task of deleting wal + * submitted by the ShutdownHook cannot be executed. This test ensures that this blocking is + * fixed. + */ + @Test + public void testWalThrottleStuck() + throws SQLException, + IoTDBConnectionException, + StatementExecutionException, + InterruptedException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getDataNodeConfig() + .setWalThrottleSize(1) + .setDeleteWalFilesPeriodInMs(100); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + try { + simpleEnv.initClusterEnvironment(1, 3); + + int leaderIndex = -1; + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + // write the first data + statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + // find the leader of the data region + int port = -1; + + ResultSet resultSet = statement.executeQuery("SHOW REGIONS"); + while (resultSet.next()) { + String regionType = resultSet.getString("Type"); + if (regionType.equals("DataRegion")) { + String role = resultSet.getString("Role"); + if (role.equals("Leader")) { + port = resultSet.getInt("RpcPort"); + break; + } + } + } + + if (port == -1) { + fail("Leader not found"); + } + + for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) { + if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) { + leaderIndex = i; + break; + } + } + } + + // stop a follower + int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size(); + simpleEnv.getDataNodeWrapperList().get(followerIndex).stop(); + System.out.println( + new Date() + + ":Stopping data node " + + simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString()); + + DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex); + // write to the leader to generate wal that cannot be synced + try (Session session = new Session(leader.getIp(), leader.getPort())) { + session.open(); + + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + } + + // wait for wal-delete thread to be scheduled + Thread.sleep(1000); + + // stop the leader + leader.getInstance().destroy(); + System.out.println(new Date() + ":Stopping data node " + leader.getIpAndPortString()); + // confirm the death of the leader + long startTime = System.currentTimeMillis(); + while (leader.isAlive()) { + if (System.currentTimeMillis() - startTime > 30000) { + fail("Leader does not exit after 30s"); + } + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } +} diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index c3a0665be6a..e196df43209 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -105,8 +105,8 @@ public class LogDispatcher { public synchronized void stop() { if (!threads.isEmpty()) { threads.forEach(LogDispatcherThread::setStopped); - threads.forEach(LogDispatcherThread::processStopped); executorService.shutdownNow(); + threads.forEach(LogDispatcherThread::processStopped); int timeout = 10; try { if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java index fe00939050e..c5a426d88b8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java @@ -44,8 +44,9 @@ public class SyncStatus { * @throws InterruptedException */ public synchronized void addNextBatch(Batch batch) throws InterruptedException { - while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() - || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) { + while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() + || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) + && !Thread.interrupted()) { wait(); } pendingBatches.add(batch); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 1a6af1cf9b0..4d0f4a5a412 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -52,15 +52,43 @@ public class DataNodeShutdownHook extends Thread { private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class); private final TDataNodeLocation nodeLocation; + private Thread watcherThread; public DataNodeShutdownHook(TDataNodeLocation nodeLocation) { super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName()); this.nodeLocation = nodeLocation; } + private void startWatcher() { + Thread hookThread = Thread.currentThread(); + watcherThread = + new Thread( + () -> { + while (!Thread.interrupted()) { + try { + Thread.sleep(10000); + StackTraceElement[] stackTrace = hookThread.getStackTrace(); + StringBuilder stackTraceBuilder = + new StringBuilder("Stack trace of shutdown hook:\n"); + for (StackTraceElement traceElement : stackTrace) { + stackTraceBuilder.append(traceElement.toString()).append("\n"); + } + logger.info(stackTraceBuilder.toString()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + }, + "ShutdownHookWatcher"); + watcherThread.setDaemon(true); + watcherThread.start(); + } + @Override public void run() { logger.info("DataNode exiting..."); + startWatcher(); // Stop external rpc service firstly. ExternalRPCService.getInstance().stop(); @@ -77,7 +105,6 @@ public class DataNodeShutdownHook extends Thread { .equals(ConsensusFactory.RATIS_CONSENSUS)) { StorageEngine.getInstance().syncCloseAllProcessor(); } - WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); // We did this work because the RatisConsensus recovery mechanism is different from other // consensus algorithms, which will replace the underlying storage engine based on its @@ -141,6 +168,8 @@ public class DataNodeShutdownHook extends Thread { "DataNode exits. Jvm memory usage: {}", MemUtils.bytesCntToStr( Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())); + + watcherThread.interrupt(); } private void triggerSnapshotForAllDataRegion() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 96d4931fa43..923ccda511e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -186,6 +186,10 @@ public class WALManager implements IService { getThrottleThreshold()); } firstLoop = false; + if (Thread.interrupted()) { + logger.info("Timed wal delete thread is interrupted."); + return; + } } } @@ -264,12 +268,15 @@ public class WALManager implements IService { if (config.getWalMode() == WALMode.DISABLE) { return; } - + logger.info("Stopping WALManager"); if (walDeleteThread != null) { shutdownThread(walDeleteThread, ThreadName.WAL_DELETE); walDeleteThread = null; } + logger.info("Deleting outdated files before exiting"); + deleteOutdatedFilesInWALNodes(); clear(); + logger.info("WALManager stopped"); } private void shutdownThread(ExecutorService thread, ThreadName threadName) {
