This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cp_iot_1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c5127e7bf81dd2bb2e6cdb518dddecf8a17bec27
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Jun 18 11:39:47 2025 +0800

    Fix stuck when stopping a DataNode with large unremovable WAL files (#15727)
    
    * Fix stuck when stopping a DataNode with large unremovable WAL files
    
    * spotless
    
    * add shutdown hook watcher
    
    * Fix logDispatcher stuck
    
    * add re-interrupt
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |  12 ++
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   4 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |  10 ++
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   4 +
 .../iotdb/db/it/IoTDBCustomizedClusterIT.java      | 148 +++++++++++++++++++++
 .../consensus/iot/logdispatcher/LogDispatcher.java |   2 +-
 .../consensus/iot/logdispatcher/SyncStatus.java    |   5 +-
 .../iotdb/db/service/DataNodeShutdownHook.java     |  31 ++++-
 .../storageengine/dataregion/wal/WALManager.java   |   9 +-
 9 files changed, 220 insertions(+), 5 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 01636b7bf0b..9ac380503cd 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -100,4 +100,16 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     setProperty("cache_last_values_for_load", 
String.valueOf(cacheLastValuesForLoad));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+    setProperty("wal_throttle_threshold_in_byte", 
String.valueOf(walThrottleSize));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDeleteWalFilesPeriodInMs(long 
deleteWalFilesPeriodInMs) {
+    setProperty("delete_wal_files_period_in_ms", 
String.valueOf(deleteWalFilesPeriodInMs));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index f1e25f9e1d6..005db9050ff 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -758,4 +758,8 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
       return -1;
     }
   }
+
+  public Process getInstance() {
+    return instance;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 80d9d98d23d..b2550ae689f 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -63,4 +63,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
   public DataNodeConfig setCacheLastValuesForLoad(boolean 
cacheLastValuesForLoad) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDeleteWalFilesPeriodInMs(long 
deleteWalFilesPeriodInMs) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index c6112a0e639..c27eb369c0d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -39,4 +39,8 @@ public interface DataNodeConfig {
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
 
   DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
+
+  DataNodeConfig setWalThrottleSize(long walThrottleSize);
+
+  DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
new file mode 100644
index 00000000000..5812e255d16
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+
+import static org.junit.Assert.fail;
+
+/** Tests that may not be satisfied with the default cluster settings. */
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBCustomizedClusterIT {
+
+  private final Logger logger = 
LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
+
+  /**
+   * When the wal size exceeds `walThrottleSize` * 0.8, the timed 
wal-delete-thread will try
+   * deleting wal forever, which will block the DataNode from exiting, because 
task of deleting wal
+   * submitted by the ShutdownHook cannot be executed. This test ensures that 
this blocking is
+   * fixed.
+   */
+  @Test
+  public void testWalThrottleStuck()
+      throws SQLException,
+          IoTDBConnectionException,
+          StatementExecutionException,
+          InterruptedException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv
+        .getConfig()
+        .getDataNodeConfig()
+        .setWalThrottleSize(1)
+        .setDeleteWalFilesPeriodInMs(100);
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(3)
+        .setSchemaReplicationFactor(3)
+        
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+        
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+    try {
+      simpleEnv.initClusterEnvironment(1, 3);
+
+      int leaderIndex = -1;
+      try (Connection connection = simpleEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        // write the first data
+        statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
+        // find the leader of the data region
+        int port = -1;
+
+        ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
+        while (resultSet.next()) {
+          String regionType = resultSet.getString("Type");
+          if (regionType.equals("DataRegion")) {
+            String role = resultSet.getString("Role");
+            if (role.equals("Leader")) {
+              port = resultSet.getInt("RpcPort");
+              break;
+            }
+          }
+        }
+
+        if (port == -1) {
+          fail("Leader not found");
+        }
+
+        for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
+          if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
+            leaderIndex = i;
+            break;
+          }
+        }
+      }
+
+      // stop a follower
+      int followerIndex = (leaderIndex + 1) % 
simpleEnv.getDataNodeWrapperList().size();
+      simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
+      System.out.println(
+          new Date()
+              + ":Stopping data node "
+              + 
simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
+
+      DataNodeWrapper leader = 
simpleEnv.getDataNodeWrapperList().get(leaderIndex);
+      // write to the leader to generate wal that cannot be synced
+      try (Session session = new Session(leader.getIp(), leader.getPort())) {
+        session.open();
+
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+      }
+
+      // wait for wal-delete thread to be scheduled
+      Thread.sleep(1000);
+
+      // stop the leader
+      leader.getInstance().destroy();
+      System.out.println(new Date() + ":Stopping data node " + 
leader.getIpAndPortString());
+      // confirm the death of the leader
+      long startTime = System.currentTimeMillis();
+      while (leader.isAlive()) {
+        if (System.currentTimeMillis() - startTime > 30000) {
+          fail("Leader does not exit after 30s");
+        }
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c3a0665be6a..e196df43209 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -105,8 +105,8 @@ public class LogDispatcher {
   public synchronized void stop() {
     if (!threads.isEmpty()) {
       threads.forEach(LogDispatcherThread::setStopped);
-      threads.forEach(LogDispatcherThread::processStopped);
       executorService.shutdownNow();
+      threads.forEach(LogDispatcherThread::processStopped);
       int timeout = 10;
       try {
         if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index fe00939050e..c5a426d88b8 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -44,8 +44,9 @@ public class SyncStatus {
    * @throws InterruptedException
    */
   public synchronized void addNextBatch(Batch batch) throws 
InterruptedException {
-    while (pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
-        || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
+    while ((pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
+            || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), 
false))
+        && !Thread.interrupted()) {
       wait();
     }
     pendingBatches.add(batch);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 1a6af1cf9b0..4d0f4a5a412 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -52,15 +52,43 @@ public class DataNodeShutdownHook extends Thread {
   private static final Logger logger = 
LoggerFactory.getLogger(DataNodeShutdownHook.class);
 
   private final TDataNodeLocation nodeLocation;
+  private Thread watcherThread;
 
   public DataNodeShutdownHook(TDataNodeLocation nodeLocation) {
     super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName());
     this.nodeLocation = nodeLocation;
   }
 
+  private void startWatcher() {
+    Thread hookThread = Thread.currentThread();
+    watcherThread =
+        new Thread(
+            () -> {
+              while (!Thread.interrupted()) {
+                try {
+                  Thread.sleep(10000);
+                  StackTraceElement[] stackTrace = hookThread.getStackTrace();
+                  StringBuilder stackTraceBuilder =
+                      new StringBuilder("Stack trace of shutdown hook:\n");
+                  for (StackTraceElement traceElement : stackTrace) {
+                    
stackTraceBuilder.append(traceElement.toString()).append("\n");
+                  }
+                  logger.info(stackTraceBuilder.toString());
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  return;
+                }
+              }
+            },
+            "ShutdownHookWatcher");
+    watcherThread.setDaemon(true);
+    watcherThread.start();
+  }
+
   @Override
   public void run() {
     logger.info("DataNode exiting...");
+    startWatcher();
     // Stop external rpc service firstly.
     ExternalRPCService.getInstance().stop();
 
@@ -77,7 +105,6 @@ public class DataNodeShutdownHook extends Thread {
         .equals(ConsensusFactory.RATIS_CONSENSUS)) {
       StorageEngine.getInstance().syncCloseAllProcessor();
     }
-    WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
 
     // We did this work because the RatisConsensus recovery mechanism is 
different from other
     // consensus algorithms, which will replace the underlying storage engine 
based on its
@@ -141,6 +168,8 @@ public class DataNodeShutdownHook extends Thread {
         "DataNode exits. Jvm memory usage: {}",
         MemUtils.bytesCntToStr(
             Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()));
+
+    watcherThread.interrupt();
   }
 
   private void triggerSnapshotForAllDataRegion() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index 96d4931fa43..923ccda511e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -186,6 +186,10 @@ public class WALManager implements IService {
             getThrottleThreshold());
       }
       firstLoop = false;
+      if (Thread.interrupted()) {
+        logger.info("Timed wal delete thread is interrupted.");
+        return;
+      }
     }
   }
 
@@ -264,12 +268,15 @@ public class WALManager implements IService {
     if (config.getWalMode() == WALMode.DISABLE) {
       return;
     }
-
+    logger.info("Stopping WALManager");
     if (walDeleteThread != null) {
       shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
       walDeleteThread = null;
     }
+    logger.info("Deleting outdated files before exiting");
+    deleteOutdatedFilesInWALNodes();
     clear();
+    logger.info("WALManager stopped");
   }
 
   private void shutdownThread(ExecutorService thread, ThreadName threadName) {

Reply via email to