This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_stop_wal_stuck in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c6354a26d64f02d987d9d4b23b74cd8a0deab826 Author: Tian Jiang <[email protected]> AuthorDate: Mon Jun 16 17:58:33 2025 +0800 Fix stuck when stopping a DataNode with large unremovable WAL files --- .../it/env/cluster/config/MppDataNodeConfig.java | 12 ++ .../it/env/cluster/node/AbstractNodeWrapper.java | 4 + .../it/env/remote/config/RemoteDataNodeConfig.java | 10 ++ .../apache/iotdb/itbase/env/DataNodeConfig.java | 4 + .../org/apache/iotdb/db/it/IoTDBStopNodeIT.java | 146 +++++++++++++++++++++ .../iotdb/db/service/DataNodeShutdownHook.java | 1 - .../storageengine/dataregion/wal/WALManager.java | 1 + 7 files changed, 177 insertions(+), 1 deletion(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 58ac4d596bc..b7fa3bdee2c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -119,4 +119,16 @@ public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad)); return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize)); + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 3b6d8406981..0e33674fdea 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -818,4 +818,8 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { return -1; } } + + public Process getInstance() { + return instance; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 63f50d15958..1f61ff4289c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -78,4 +78,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig { public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 353fdef7f25..4d2e4435bdb 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -45,4 +45,8 @@ public interface DataNodeConfig { DataNodeConfig setLoadLastCacheStrategy(String strategyName); DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad); + + DataNodeConfig setWalThrottleSize(long walThrottleSize); + + DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java new file mode 100644 index 00000000000..05078f4c416 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBStopNodeIT { + + private final Logger logger = LoggerFactory.getLogger(IoTDBStopNodeIT.class); + + /** + * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try deleting wal forever, + * which will block the DataNode from exiting, because task of deleting wal submitted by the ShutdownHook cannot be + * executed. + * This test ensures that this blocking is fixed. + */ + @Test + public void testWalThrottleStuck() + throws SQLException, + IoTDBConnectionException, + StatementExecutionException, + InterruptedException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getDataNodeConfig() + .setWalThrottleSize(1) + .setDeleteWalFilesPeriodInMs(100); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + try { + simpleEnv.initClusterEnvironment(1, 3); + + int leaderIndex = -1; + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + // write the first data + statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + // find the leader of the data region + int port = -1; + + ResultSet resultSet = statement.executeQuery("SHOW REGIONS"); + while (resultSet.next()) { + String regionType = resultSet.getString("Type"); + if (regionType.equals("DataRegion")) { + String role = resultSet.getString("Role"); + if (role.equals("Leader")) { + port = resultSet.getInt("RpcPort"); + break; + } + } + } + + if (port == -1) { + fail("Leader not found"); + } + + for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) { + if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) { + leaderIndex = i; + break; + } + } + } + + // stop a follower + int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size(); + simpleEnv.getDataNodeWrapperList().get(followerIndex).stop(); + System.out.println( + "Stopping data node " + + simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString()); + + DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex); + // write to the leader to generate wal that cannot be synced + try (Session session = new Session(leader.getIp(), leader.getPort())) { + session.open(); + + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + } + + // wait for wal-delete thread to be scheduled + Thread.sleep(1000); + + // stop the leader + leader.getInstance().destroy(); + System.out.println("Stopping data node " + leader.getIpAndPortString()); + // confirm the death of the leader + long startTime = System.currentTimeMillis(); + while (leader.isAlive()) { + if (System.currentTimeMillis() - startTime > 30000) { + fail("Leader does not exit after 30s"); + } + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 9fb29e7c612..d3125fae0f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -77,7 +77,6 @@ public class DataNodeShutdownHook extends Thread { .equals(ConsensusFactory.RATIS_CONSENSUS)) { StorageEngine.getInstance().syncCloseAllProcessor(); } - WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); // We did this work because the RatisConsensus recovery mechanism is different from other // consensus algorithms, which will replace the underlying storage engine based on its diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 7d9f7bb4679..2c4244585e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -272,6 +272,7 @@ public class WALManager implements IService { shutdownThread(walDeleteThread, ThreadName.WAL_DELETE); walDeleteThread = null; } + deleteOutdatedFilesInWALNodes(); clear(); }
