This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7477ab9456b IoTV2: Add zombie tsFile writer checker as watch dog to 
prevent sync stuck. (#15317)
7477ab9456b is described below

commit 7477ab9456b87153ea29b63a04cd2af8d4dc24e3
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 11 21:17:12 2025 +0800

    IoTV2: Add zombie tsFile writer checker as watch dog to prevent sync stuck. 
(#15317)
    
    * add watch dog for tsfile writer
    
    * add description
    
    * improve periodic job
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 18 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 19 ++++++++
 .../pipeconsensus/PipeConsensusReceiver.java       | 55 ++++++++++++++++++++++
 .../conf/iotdb-system.properties.template          | 14 ++++++
 .../iotdb/commons/concurrent/ThreadName.java       |  8 ++--
 5 files changed, 111 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index dfceb0c6021..00310772704 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1062,6 +1062,8 @@ public class IoTDBConfig {
   // IoTConsensusV2 Config
   private int iotConsensusV2PipelineSize = 5;
   private String iotConsensusV2Mode = 
ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE;
+  private long tsFileWriterCheckInterval = TimeUnit.MINUTES.toMillis(5);
+  private long tsFileWriterZombieThreshold = TimeUnit.MINUTES.toMillis(10);
   private String[] iotConsensusV2ReceiverFileDirs = new String[0];
   private String iotConsensusV2DeletionFileDir =
       systemDir
@@ -1188,6 +1190,22 @@ public class IoTDBConfig {
     this.iotConsensusV2PipelineSize = iotConsensusV2PipelineSize;
   }
 
+  public long getTsFileWriterCheckInterval() {
+    return tsFileWriterCheckInterval;
+  }
+
+  public void setTsFileWriterCheckInterval(long tsFileWriterCheckInterval) {
+    this.tsFileWriterCheckInterval = tsFileWriterCheckInterval;
+  }
+
+  public long getTsFileWriterZombieThreshold() {
+    return tsFileWriterZombieThreshold;
+  }
+
+  public void setTsFileWriterZombieThreshold(long tsFileWriterZombieThreshold) 
{
+    this.tsFileWriterZombieThreshold = tsFileWriterZombieThreshold;
+  }
+
   public void setMaxSizePerBatch(int maxSizePerBatch) {
     this.maxSizePerBatch = maxSizePerBatch;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b4b2dbb50c8..77ee188a7ec 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -89,6 +89,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 public class IoTDBDescriptor {
@@ -1151,6 +1152,24 @@ public class IoTDBDescriptor {
     if (deletionAheadLogBufferQueueCapacity > 0) {
       
conf.setDeletionAheadLogBufferQueueCapacity(deletionAheadLogBufferQueueCapacity);
     }
+    conf.setTsFileWriterCheckInterval(
+        Integer.parseInt(
+            properties.getProperty(
+                "zombie_tsfile_writer_check_interval",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "zombie_tsfile_writer_check_interval"))));
+    if (conf.getTsFileWriterCheckInterval() <= 0) {
+      conf.setTsFileWriterCheckInterval(TimeUnit.MINUTES.toMillis(5));
+    }
+    conf.setTsFileWriterZombieThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "zombie_tsfile_writer_threshold",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "zombie_tsfile_writer_threshold"))));
+    if (conf.getTsFileWriterZombieThreshold() <= 0) {
+      conf.setTsFileWriterZombieThreshold(TimeUnit.MINUTES.toMillis(10));
+    }
   }
 
   private void loadAuthorCache(TrimProperties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 7cb647786f5..f81cbdfe88f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -81,6 +84,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.TreeSet;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -104,6 +109,10 @@ public class PipeConsensusReceiver {
   private final ConsensusPipeName consensusPipeName;
   // Used to buffer TsFile when transfer TsFile asynchronously.
   private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
+  private final ScheduledExecutorService scheduledTsFileWriterCheckerPool =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_CONSENSUS_TSFILE_WRITER_CHECKER.getName());
+  private Future<?> tsFileWriterCheckerFuture;
   private final List<String> receiveDirs = new ArrayList<>();
   private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
   private final FolderManager folderManager;
@@ -1068,6 +1077,11 @@ public class PipeConsensusReceiver {
     clearAllReceiverBaseDir();
     // remove metric
     MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
+    // cancel periodic task
+    if (tsFileWriterCheckerFuture != null) {
+      tsFileWriterCheckerFuture.cancel(false);
+      tsFileWriterCheckerFuture = null;
+    }
     LOGGER.info(
         "PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.", 
consensusPipeName.toString());
   }
@@ -1087,6 +1101,18 @@ public class PipeConsensusReceiver {
         tsFileWriter.rollToNextWritingPath();
         pipeConsensusTsFileWriterPool.add(tsFileWriter);
       }
+
+      tsFileWriterCheckerFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              scheduledTsFileWriterCheckerPool,
+              this::checkZombieTsFileWriter,
+              0,
+              IOTDB_CONFIG.getTsFileWriterCheckInterval(),
+              TimeUnit.MILLISECONDS);
+      LOGGER.info(
+          "Register {} with interval in seconds {} successfully.",
+          ThreadName.PIPE_CONSENSUS_TSFILE_WRITER_CHECKER.getName(),
+          IOTDB_CONFIG.getTsFileWriterCheckInterval());
     }
 
     @SuppressWarnings("java:S3655")
@@ -1127,6 +1153,31 @@ public class PipeConsensusReceiver {
       return tsFileWriter.get();
     }
 
+    private void checkZombieTsFileWriter() {
+      pipeConsensusTsFileWriterPool.stream()
+          .filter(PipeConsensusTsFileWriter::isUsed)
+          .forEach(
+              writer -> {
+                if (System.currentTimeMillis() - writer.lastUsedTs
+                    >= IOTDB_CONFIG.getTsFileWriterZombieThreshold()) {
+                  try {
+                    writer.closeSelf(consensusPipeName);
+                    writer.returnSelf(consensusPipeName);
+                    LOGGER.info(
+                        "PipeConsensus-PipeName-{}: tsfile writer-{} is 
cleaned up because no new requests were received for too long.",
+                        consensusPipeName,
+                        writer.index);
+                  } catch (IOException | DiskSpaceInsufficientException e) {
+                    LOGGER.warn(
+                        "PipeConsensus-PipeName-{}: receiver watch dog failed 
to return tsFileWriter-{}.",
+                        consensusPipeName.toString(),
+                        writer.index,
+                        e);
+                  }
+                }
+              });
+    }
+
     public void handleExit(ConsensusPipeName consensusPipeName) {
       pipeConsensusTsFileWriterPool.forEach(
           tsFileWriter -> {
@@ -1172,6 +1223,7 @@ public class PipeConsensusReceiver {
     private volatile boolean isUsed = false;
     // If isUsed is true, this variable will be set to the TCommitId of 
holderEvent
     private volatile TCommitId commitIdOfCorrespondingHolderEvent;
+    private long lastUsedTs;
 
     public PipeConsensusTsFileWriter(int index, ConsensusPipeName 
consensusPipeName) {
       this.index = index;
@@ -1263,6 +1315,9 @@ public class PipeConsensusReceiver {
 
     public void setUsed(boolean used) {
       isUsed = used;
+      if (isUsed) {
+        lastUsedTs = System.currentTimeMillis();
+      }
     }
 
     public void returnSelf(ConsensusPipeName consensusPipeName)
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 8d40a28d8af..a7f00234c06 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1939,6 +1939,20 @@ iot_consensus_v2_mode=batch
 # Datatype: int
 deletion_ahead_log_buffer_queue_capacity=500
 
+# Interval for check zombie TsFile writer in iot consensus v2
+# time unit is milliseconds.
+# effectiveMode: restart
+# DataType: long
+zombie_tsfile_writer_check_interval=300000
+
+# Threshold for determining whether a TsFile writer is a zombie in iot 
consensus v2
+# The larger this value is, the longer the synchronization will be stuck when 
an unexpected situation occurs.
+# The smaller this value is, the greater the possibility of misjudgment, which 
may cause consistency loss.
+# time unit is milliseconds.
+# effectiveMode: restart
+# DataType: long
+zombie_tsfile_writer_threshold=600000
+
 ####################
 ### Procedure Configuration
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 577b9bb795b..a6c42d1186c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,8 +103,8 @@ public enum ThreadName {
   PIPE_CONSENSUS_RPC_SERVICE("PipeConsensusRPC-Service"),
   PIPE_CONSENSUS_RPC_PROCESSOR("PipeConsensusRPC-Processor"),
   
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL("AsyncDataNodePipeConsensusServiceClientPool"),
-  PIPE_CONSENSUS_DELETION_SERIALIZE("WAL-Serialize"),
-  PIPE_CONSENSUS_DELETION_SYNC("WAL-Sync"),
+  PIPE_CONSENSUS_DELETION_SERIALIZE("DAL-Serialize"),
+  PIPE_CONSENSUS_TSFILE_WRITER_CHECKER("PipeConsensus-TsFileWriter-Checker"),
 
   // -------------------------- IoTConsensus --------------------------
   IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
@@ -265,7 +265,9 @@ public enum ThreadName {
           Arrays.asList(
               PIPE_CONSENSUS_RPC_SERVICE,
               PIPE_CONSENSUS_RPC_PROCESSOR,
-              ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL));
+              ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL,
+              PIPE_CONSENSUS_DELETION_SERIALIZE,
+              PIPE_CONSENSUS_TSFILE_WRITER_CHECKER));
 
   private static final Set<ThreadName> ratisThreadNames =
       new HashSet<>(

Reply via email to