This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e4fc7582bc4 Pipe: Optimized the thread executor of the terminate event
(#17638)
e4fc7582bc4 is described below
commit e4fc7582bc4c03e2c0a7205dc66bc01d3b1bf1c5
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 12 17:13:12 2026 +0800
Pipe: Optimized the thread executor of the terminate event (#17638)
---
.../event/common/terminate/PipeTerminateEvent.java | 31 +++++++++++++++-------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 3b933b92b73..64209242cc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -49,17 +49,28 @@ public class PipeTerminateEvent extends EnrichedEvent {
private final boolean shouldMark;
+ private static final int TERMINATE_EXECUTOR_THREAD_COUNT =
+ IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount();
+
+ private static final int TERMINATE_EXECUTOR_QUEUE_SIZE =
+ Math.max(1024, TERMINATE_EXECUTOR_THREAD_COUNT * 64);
+
// Do not use call run policy to avoid deadlock
- private static final ExecutorService terminateExecutor =
- new WrappedThreadPoolExecutor(
- 0,
- IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
- 0L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(
-
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
- new
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
- ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+ private static final ExecutorService terminateExecutor =
createTerminateExecutor();
+
+ private static ExecutorService createTerminateExecutor() {
+ final WrappedThreadPoolExecutor executor =
+ new WrappedThreadPoolExecutor(
+ TERMINATE_EXECUTOR_THREAD_COUNT,
+ TERMINATE_EXECUTOR_THREAD_COUNT,
+ 60L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(TERMINATE_EXECUTOR_QUEUE_SIZE),
+ new
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
+ ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
public PipeTerminateEvent(
final String pipeName,