This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_stop_wal_stuck
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c6354a26d64f02d987d9d4b23b74cd8a0deab826
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jun 16 17:58:33 2025 +0800

    Fix stuck when stopping a DataNode with large unremovable WAL files
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |  12 ++
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   4 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |  10 ++
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   4 +
 .../org/apache/iotdb/db/it/IoTDBStopNodeIT.java    | 146 +++++++++++++++++++++
 .../iotdb/db/service/DataNodeShutdownHook.java     |   1 -
 .../storageengine/dataregion/wal/WALManager.java   |   1 +
 7 files changed, 177 insertions(+), 1 deletion(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 58ac4d596bc..b7fa3bdee2c 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -119,4 +119,16 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     setProperty("cache_last_values_for_load", 
String.valueOf(cacheLastValuesForLoad));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+    setProperty("wal_throttle_threshold_in_byte", 
String.valueOf(walThrottleSize));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDeleteWalFilesPeriodInMs(long 
deleteWalFilesPeriodInMs) {
+    setProperty("delete_wal_files_period_in_ms", 
String.valueOf(deleteWalFilesPeriodInMs));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 3b6d8406981..0e33674fdea 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -818,4 +818,8 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
       return -1;
     }
   }
+
+  public Process getInstance() {
+    return instance;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 63f50d15958..1f61ff4289c 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -78,4 +78,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
   public DataNodeConfig setCacheLastValuesForLoad(boolean 
cacheLastValuesForLoad) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDeleteWalFilesPeriodInMs(long 
deleteWalFilesPeriodInMs) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 353fdef7f25..4d2e4435bdb 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -45,4 +45,8 @@ public interface DataNodeConfig {
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
 
   DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
+
+  DataNodeConfig setWalThrottleSize(long walThrottleSize);
+
+  DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java
new file mode 100644
index 00000000000..05078f4c416
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBStopNodeIT {
+
+  private final Logger logger = LoggerFactory.getLogger(IoTDBStopNodeIT.class);
+
+  /**
+   * When the wal size exceeds `walThrottleSize` * 0.8, the timed 
wal-delete-thread will try deleting wal forever,
+   * which will block the DataNode from exiting, because task of deleting wal 
submitted by the ShutdownHook cannot be
+   * executed.
+   * This test ensures that this blocking is fixed.
+   */
+  @Test
+  public void testWalThrottleStuck()
+      throws SQLException,
+          IoTDBConnectionException,
+          StatementExecutionException,
+          InterruptedException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv
+        .getConfig()
+        .getDataNodeConfig()
+        .setWalThrottleSize(1)
+        .setDeleteWalFilesPeriodInMs(100);
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(3)
+        .setSchemaReplicationFactor(3)
+        
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+        
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+    try {
+      simpleEnv.initClusterEnvironment(1, 3);
+
+      int leaderIndex = -1;
+      try (Connection connection = simpleEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        // write the first data
+        statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
+        // find the leader of the data region
+        int port = -1;
+
+        ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
+        while (resultSet.next()) {
+          String regionType = resultSet.getString("Type");
+          if (regionType.equals("DataRegion")) {
+            String role = resultSet.getString("Role");
+            if (role.equals("Leader")) {
+              port = resultSet.getInt("RpcPort");
+              break;
+            }
+          }
+        }
+
+        if (port == -1) {
+          fail("Leader not found");
+        }
+
+        for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
+          if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
+            leaderIndex = i;
+            break;
+          }
+        }
+      }
+
+      // stop a follower
+      int followerIndex = (leaderIndex + 1) % 
simpleEnv.getDataNodeWrapperList().size();
+      simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
+      System.out.println(
+          "Stopping data node "
+              + 
simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
+
+      DataNodeWrapper leader = 
simpleEnv.getDataNodeWrapperList().get(leaderIndex);
+      // write to the leader to generate wal that cannot be synced
+      try (Session session = new Session(leader.getIp(), leader.getPort())) {
+        session.open();
+
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+      }
+
+      // wait for wal-delete thread to be scheduled
+      Thread.sleep(1000);
+
+      // stop the leader
+      leader.getInstance().destroy();
+      System.out.println("Stopping data node " + leader.getIpAndPortString());
+      // confirm the death of the leader
+      long startTime = System.currentTimeMillis();
+      while (leader.isAlive()) {
+        if (System.currentTimeMillis() - startTime > 30000) {
+          fail("Leader does not exit after 30s");
+        }
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 9fb29e7c612..d3125fae0f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -77,7 +77,6 @@ public class DataNodeShutdownHook extends Thread {
         .equals(ConsensusFactory.RATIS_CONSENSUS)) {
       StorageEngine.getInstance().syncCloseAllProcessor();
     }
-    WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
 
     // We did this work because the RatisConsensus recovery mechanism is 
different from other
     // consensus algorithms, which will replace the underlying storage engine 
based on its
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index 7d9f7bb4679..2c4244585e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -272,6 +272,7 @@ public class WALManager implements IService {
       shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
       walDeleteThread = null;
     }
+    deleteOutdatedFilesInWALNodes();
     clear();
   }
 

Reply via email to