This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79c1512d0432aec58e9f6f09b66c4fdc98af81cd
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 7 09:28:10 2025 +0800

    Pipe: Do not use the fork join pool in TerminateEvent (#16113)
    
    * fix
    
    * optimize
    
    (cherry picked from commit fccb90cc89d6bcf5a85cc8d516851ee67a18a3fb)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../event/common/terminate/PipeTerminateEvent.java | 37 ++++++++++++++++++----
 ...istoricalDataRegionTsFileAndDeletionSource.java | 30 +++++++++++++++++-
 .../iotdb/commons/concurrent/ThreadName.java       |  1 +
 4 files changed, 62 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f8cbe0b45b9..e722a199220 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -142,7 +142,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
           0L,
           TimeUnit.SECONDS,
           new ArrayBlockingQueue<>(
-              
IoTDBDescriptor.getInstance().getConfig().getSchemaThreadCount()),
+              
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
           new 
IoTThreadFactory(ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName()),
           ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName(),
           new ThreadPoolExecutor.CallerRunsPolicy());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index f85b26b6228..bf7b5a6f527 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -19,17 +19,23 @@
 
 package org.apache.iotdb.db.pipe.event.common.terminate;
 
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
@@ -41,11 +47,26 @@ public class PipeTerminateEvent extends EnrichedEvent {
 
   private final int dataRegionId;
 
+  private final boolean shouldMark;
+
+  // Do not use call run policy to avoid deadlock
+  private static final ExecutorService terminateExecutor =
+      new WrappedThreadPoolExecutor(
+          0,
+          IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
+          0L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(
+              
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
+          new 
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
+          ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+
   public PipeTerminateEvent(
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final int dataRegionId) {
+      final int dataRegionId,
+      final boolean shouldMark) {
     super(
         pipeName,
         creationTime,
@@ -57,6 +78,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
         Long.MIN_VALUE,
         Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
+    this.shouldMark = shouldMark;
   }
 
   @Override
@@ -87,7 +109,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
       final long endTime) {
     // Should record PipeTaskMeta, for the terminateEvent shall report 
progress to
     // notify the pipeTask it's completed.
-    return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
+    return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId, shouldMark);
   }
 
   @Override
@@ -108,13 +130,16 @@ public class PipeTerminateEvent extends EnrichedEvent {
   @Override
   public void reportProgress() {
     // To avoid deadlock
-    CompletableFuture.runAsync(
-        () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
+    if (shouldMark) {
+      terminateExecutor.submit(
+          () -> PipeDataNodeAgent.task().markCompleted(pipeName, 
dataRegionId));
+    }
   }
 
   @Override
   public String toString() {
-    return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId)
+    return String.format(
+            "PipeTerminateEvent{dataRegionId=%s, shouldMark=%s}", 
dataRegionId, shouldMark)
         + " - "
         + super.toString();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index af38d626f4e..2678c18d51e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -92,6 +92,12 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_DEFAULT_VALUE;
@@ -104,6 +110,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY;
@@ -150,6 +158,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
   private boolean shouldTransferModFile; // Whether to transfer mods
   protected String userName;
   protected boolean skipIfNoPrivileges = true;
+  private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
   private boolean isTerminateSignalSent = false;
 
   private boolean isForwardingPipeRequests;
@@ -353,6 +362,20 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
                   listeningOptionPair.getRight());
     }
 
+    if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
+      shouldTerminatePipeOnAllHistoricalEventsConsumed =
+          parameters.getBooleanOrDefault(
+              Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
+              EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
+    } else {
+      final String extractorModeValue =
+          parameters.getStringOrDefault(
+              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+      shouldTerminatePipeOnAllHistoricalEventsConsumed =
+          extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+              || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+    }
+
     userName =
         parameters.getStringByKeys(
             PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
@@ -770,7 +793,12 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
 
   private Event supplyTerminateEvent() {
     final PipeTerminateEvent terminateEvent =
-        new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
+        new PipeTerminateEvent(
+            pipeName,
+            creationTime,
+            pipeTaskMeta,
+            dataRegionId,
+            shouldTerminatePipeOnAllHistoricalEventsConsumed);
     if (!terminateEvent.increaseReferenceCount(
         PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName())) {
       LOGGER.warn(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 6b7ac2a7b6c..390e9f80e9b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -152,6 +152,7 @@ public enum ThreadName {
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
   PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
+  PIPE_TERMINATE_EXECUTION_POOL("Pipe-Terminate-Execution-Pool"),
   LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
   SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
   SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),

Reply via email to