This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79c1512d0432aec58e9f6f09b66c4fdc98af81cd Author: Caideyipi <[email protected]> AuthorDate: Thu Aug 7 09:28:10 2025 +0800 Pipe: Do not use the fork join pool in TerminateEvent (#16113) * fix * optimize (cherry picked from commit fccb90cc89d6bcf5a85cc8d516851ee67a18a3fb) --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +- .../event/common/terminate/PipeTerminateEvent.java | 37 ++++++++++++++++++---- ...istoricalDataRegionTsFileAndDeletionSource.java | 30 +++++++++++++++++- .../iotdb/commons/concurrent/ThreadName.java | 1 + 4 files changed, 62 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index f8cbe0b45b9..e722a199220 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -142,7 +142,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>( - IoTDBDescriptor.getInstance().getConfig().getSchemaThreadCount()), + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()), new IoTThreadFactory(ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName()), ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName(), new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index f85b26b6228..bf7b5a6f527 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -19,17 +19,23 @@ package org.apache.iotdb.db.pipe.event.common.terminate; +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe, @@ -41,11 +47,26 @@ public class PipeTerminateEvent extends EnrichedEvent { private final int dataRegionId; + private final boolean shouldMark; + + // Do not use call run policy to avoid deadlock + private static final ExecutorService terminateExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()), + new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()), + ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()); + public PipeTerminateEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final int dataRegionId) { + final int dataRegionId, + final boolean shouldMark) { super( pipeName, creationTime, @@ -57,6 +78,7 @@ public class PipeTerminateEvent extends EnrichedEvent { Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; + this.shouldMark = shouldMark; } @Override @@ -87,7 +109,7 @@ public class PipeTerminateEvent extends EnrichedEvent { final long endTime) { // Should record PipeTaskMeta, for the terminateEvent shall report progress to // notify the pipeTask it's completed. - return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId); + return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId, shouldMark); } @Override @@ -108,13 +130,16 @@ public class PipeTerminateEvent extends EnrichedEvent { @Override public void reportProgress() { // To avoid deadlock - CompletableFuture.runAsync( - () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId)); + if (shouldMark) { + terminateExecutor.submit( + () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId)); + } } @Override public String toString() { - return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId) + return String.format( + "PipeTerminateEvent{dataRegionId=%s, shouldMark=%s}", dataRegionId, shouldMark) + " - " + super.toString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index af38d626f4e..2678c18d51e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -92,6 +92,12 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_DEFAULT_VALUE; @@ -104,6 +110,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY; @@ -150,6 +158,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private boolean shouldTransferModFile; // Whether to transfer mods protected String userName; protected boolean skipIfNoPrivileges = true; + private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed; private boolean isTerminateSignalSent = false; private boolean isForwardingPipeRequests; @@ -353,6 +362,20 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource listeningOptionPair.getRight()); } + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { + shouldTerminatePipeOnAllHistoricalEventsConsumed = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY), + EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE); + } else { + final String extractorModeValue = + parameters.getStringOrDefault( + Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); + shouldTerminatePipeOnAllHistoricalEventsConsumed = + extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) + || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + } + userName = parameters.getStringByKeys( PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, @@ -770,7 +793,12 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private Event supplyTerminateEvent() { final PipeTerminateEvent terminateEvent = - new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId); + new PipeTerminateEvent( + pipeName, + creationTime, + pipeTaskMeta, + dataRegionId, + shouldTerminatePipeOnAllHistoricalEventsConsumed); if (!terminateEvent.increaseReferenceCount( PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName())) { LOGGER.warn( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 6b7ac2a7b6c..390e9f80e9b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -152,6 +152,7 @@ public enum ThreadName { PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"), PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"), PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"), + PIPE_TERMINATE_EXECUTION_POOL("Pipe-Terminate-Execution-Pool"), LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"), SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"), SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
