This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7477ab9456b IoTV2: Add zombie tsFile writer checker as watch dog to
prevent sync stuck. (#15317)
7477ab9456b is described below
commit 7477ab9456b87153ea29b63a04cd2af8d4dc24e3
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 11 21:17:12 2025 +0800
IoTV2: Add zombie tsFile writer checker as watch dog to prevent sync stuck.
(#15317)
* add watch dog for tsfile writer
* add description
* improve periodic job
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 18 +++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 19 ++++++++
.../pipeconsensus/PipeConsensusReceiver.java | 55 ++++++++++++++++++++++
.../conf/iotdb-system.properties.template | 14 ++++++
.../iotdb/commons/concurrent/ThreadName.java | 8 ++--
5 files changed, 111 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index dfceb0c6021..00310772704 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1062,6 +1062,8 @@ public class IoTDBConfig {
// IoTConsensusV2 Config
private int iotConsensusV2PipelineSize = 5;
private String iotConsensusV2Mode =
ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE;
+ private long tsFileWriterCheckInterval = TimeUnit.MINUTES.toMillis(5);
+ private long tsFileWriterZombieThreshold = TimeUnit.MINUTES.toMillis(10);
private String[] iotConsensusV2ReceiverFileDirs = new String[0];
private String iotConsensusV2DeletionFileDir =
systemDir
@@ -1188,6 +1190,22 @@ public class IoTDBConfig {
this.iotConsensusV2PipelineSize = iotConsensusV2PipelineSize;
}
+ public long getTsFileWriterCheckInterval() {
+ return tsFileWriterCheckInterval;
+ }
+
+ public void setTsFileWriterCheckInterval(long tsFileWriterCheckInterval) {
+ this.tsFileWriterCheckInterval = tsFileWriterCheckInterval;
+ }
+
+ public long getTsFileWriterZombieThreshold() {
+ return tsFileWriterZombieThreshold;
+ }
+
+ public void setTsFileWriterZombieThreshold(long tsFileWriterZombieThreshold)
{
+ this.tsFileWriterZombieThreshold = tsFileWriterZombieThreshold;
+ }
+
public void setMaxSizePerBatch(int maxSizePerBatch) {
this.maxSizePerBatch = maxSizePerBatch;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b4b2dbb50c8..77ee188a7ec 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -89,6 +89,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class IoTDBDescriptor {
@@ -1151,6 +1152,24 @@ public class IoTDBDescriptor {
if (deletionAheadLogBufferQueueCapacity > 0) {
conf.setDeletionAheadLogBufferQueueCapacity(deletionAheadLogBufferQueueCapacity);
}
+ conf.setTsFileWriterCheckInterval(
+ Integer.parseInt(
+ properties.getProperty(
+ "zombie_tsfile_writer_check_interval",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "zombie_tsfile_writer_check_interval"))));
+ if (conf.getTsFileWriterCheckInterval() <= 0) {
+ conf.setTsFileWriterCheckInterval(TimeUnit.MINUTES.toMillis(5));
+ }
+ conf.setTsFileWriterZombieThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "zombie_tsfile_writer_threshold",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "zombie_tsfile_writer_threshold"))));
+ if (conf.getTsFileWriterZombieThreshold() <= 0) {
+ conf.setTsFileWriterZombieThreshold(TimeUnit.MINUTES.toMillis(10));
+ }
}
private void loadAuthorCache(TrimProperties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 7cb647786f5..f81cbdfe88f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -81,6 +84,8 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeSet;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -104,6 +109,10 @@ public class PipeConsensusReceiver {
private final ConsensusPipeName consensusPipeName;
// Used to buffer TsFile when transfer TsFile asynchronously.
private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
+ private final ScheduledExecutorService scheduledTsFileWriterCheckerPool =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_CONSENSUS_TSFILE_WRITER_CHECKER.getName());
+ private Future<?> tsFileWriterCheckerFuture;
private final List<String> receiveDirs = new ArrayList<>();
private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
private final FolderManager folderManager;
@@ -1068,6 +1077,11 @@ public class PipeConsensusReceiver {
clearAllReceiverBaseDir();
// remove metric
MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
+ // cancel periodic task
+ if (tsFileWriterCheckerFuture != null) {
+ tsFileWriterCheckerFuture.cancel(false);
+ tsFileWriterCheckerFuture = null;
+ }
LOGGER.info(
"PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.",
consensusPipeName.toString());
}
@@ -1087,6 +1101,18 @@ public class PipeConsensusReceiver {
tsFileWriter.rollToNextWritingPath();
pipeConsensusTsFileWriterPool.add(tsFileWriter);
}
+
+ tsFileWriterCheckerFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ scheduledTsFileWriterCheckerPool,
+ this::checkZombieTsFileWriter,
+ 0,
+ IOTDB_CONFIG.getTsFileWriterCheckInterval(),
+ TimeUnit.MILLISECONDS);
+ LOGGER.info(
+ "Register {} with interval in seconds {} successfully.",
+ ThreadName.PIPE_CONSENSUS_TSFILE_WRITER_CHECKER.getName(),
+ IOTDB_CONFIG.getTsFileWriterCheckInterval());
}
@SuppressWarnings("java:S3655")
@@ -1127,6 +1153,31 @@ public class PipeConsensusReceiver {
return tsFileWriter.get();
}
+ private void checkZombieTsFileWriter() {
+ pipeConsensusTsFileWriterPool.stream()
+ .filter(PipeConsensusTsFileWriter::isUsed)
+ .forEach(
+ writer -> {
+ if (System.currentTimeMillis() - writer.lastUsedTs
+ >= IOTDB_CONFIG.getTsFileWriterZombieThreshold()) {
+ try {
+ writer.closeSelf(consensusPipeName);
+ writer.returnSelf(consensusPipeName);
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: tsfile writer-{} is
cleaned up because no new requests were received for too long.",
+ consensusPipeName,
+ writer.index);
+ } catch (IOException | DiskSpaceInsufficientException e) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: receiver watch dog failed
to return tsFileWriter-{}.",
+ consensusPipeName.toString(),
+ writer.index,
+ e);
+ }
+ }
+ });
+ }
+
public void handleExit(ConsensusPipeName consensusPipeName) {
pipeConsensusTsFileWriterPool.forEach(
tsFileWriter -> {
@@ -1172,6 +1223,7 @@ public class PipeConsensusReceiver {
private volatile boolean isUsed = false;
// If isUsed is true, this variable will be set to the TCommitId of
holderEvent
private volatile TCommitId commitIdOfCorrespondingHolderEvent;
+ private long lastUsedTs;
public PipeConsensusTsFileWriter(int index, ConsensusPipeName
consensusPipeName) {
this.index = index;
@@ -1263,6 +1315,9 @@ public class PipeConsensusReceiver {
public void setUsed(boolean used) {
isUsed = used;
+ if (isUsed) {
+ lastUsedTs = System.currentTimeMillis();
+ }
}
public void returnSelf(ConsensusPipeName consensusPipeName)
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 8d40a28d8af..a7f00234c06 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1939,6 +1939,20 @@ iot_consensus_v2_mode=batch
# Datatype: int
deletion_ahead_log_buffer_queue_capacity=500
+# Interval for check zombie TsFile writer in iot consensus v2
+# time unit is milliseconds.
+# effectiveMode: restart
+# DataType: long
+zombie_tsfile_writer_check_interval=300000
+
+# Threshold for determining whether a TsFile writer is a zombie in iot
consensus v2
+# The larger this value is, the longer the synchronization will be stuck when
an unexpected situation occurs.
+# The smaller this value is, the greater the possibility of misjudgment, which
may cause consistency loss.
+# time unit is milliseconds.
+# effectiveMode: restart
+# DataType: long
+zombie_tsfile_writer_threshold=600000
+
####################
### Procedure Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 577b9bb795b..a6c42d1186c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,8 +103,8 @@ public enum ThreadName {
PIPE_CONSENSUS_RPC_SERVICE("PipeConsensusRPC-Service"),
PIPE_CONSENSUS_RPC_PROCESSOR("PipeConsensusRPC-Processor"),
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL("AsyncDataNodePipeConsensusServiceClientPool"),
- PIPE_CONSENSUS_DELETION_SERIALIZE("WAL-Serialize"),
- PIPE_CONSENSUS_DELETION_SYNC("WAL-Sync"),
+ PIPE_CONSENSUS_DELETION_SERIALIZE("DAL-Serialize"),
+ PIPE_CONSENSUS_TSFILE_WRITER_CHECKER("PipeConsensus-TsFileWriter-Checker"),
// -------------------------- IoTConsensus --------------------------
IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
@@ -265,7 +265,9 @@ public enum ThreadName {
Arrays.asList(
PIPE_CONSENSUS_RPC_SERVICE,
PIPE_CONSENSUS_RPC_PROCESSOR,
- ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL));
+ ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL,
+ PIPE_CONSENSUS_DELETION_SERIALIZE,
+ PIPE_CONSENSUS_TSFILE_WRITER_CHECKER));
private static final Set<ThreadName> ratisThreadNames =
new HashSet<>(