This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 764f761379267878996370a871c2f71e433e8946 Author: 罗振羽 <[email protected]> AuthorDate: Thu May 14 07:38:58 2026 +0000 [TIMECHODB]refactor(pipe): use thread pool factory for scp uploads (cherry picked from commit ec5f4537de036ebfb924ab078c85e76fae889c34) --- .../plugin/sink/tsfile/ScpRemoteFileTransfer.java | 36 +++++----------------- 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java index 25005f3f931..1a27f026c87 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.pipe.plugin.sink.tsfile; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -48,11 +49,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -113,7 +113,7 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { private final int objectUploadWaitingQueueSize; private final long objectUploadThreadKeepAliveSeconds; private final RateLimiter transferRateLimiter; - private final ThreadPoolExecutor objectUploadExecutor; + private final ExecutorService objectUploadExecutor; private final BlockingQueue<PooledWorkerSession> idleWorkerSessions; private SshClient client; @@ -191,11 +191,7 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { DEFAULT_OBJECT_UPLOAD_THREAD_KEEP_ALIVE_SECONDS); } this.idleWorkerSessions = new LinkedBlockingQueue<>(objectUploadParallelism); - this.objectUploadExecutor = - createObjectUploadExecutor( - objectUploadParallelism, - objectUploadWaitingQueueSize, - objectUploadThreadKeepAliveSeconds); + this.objectUploadExecutor = createObjectUploadExecutor(objectUploadParallelism); final double bytesPerSecond = params.getDoubleOrDefault( @@ -445,27 +441,9 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } } - private static BlockingQueue<Runnable> createObjectUploadQueue(final int waitingQueueSize) { - return waitingQueueSize == 0 - ? new SynchronousQueue<>() - : new LinkedBlockingQueue<>(waitingQueueSize); - } - - private static ThreadPoolExecutor createObjectUploadExecutor( - final int maximumParallelism, final int waitingQueueSize, final long keepAliveSeconds) { - final ThreadPoolExecutor executor = - new ThreadPoolExecutor( - maximumParallelism, - maximumParallelism, - keepAliveSeconds, - TimeUnit.SECONDS, - createObjectUploadQueue(waitingQueueSize), - runnable -> - new Thread( - runnable, - "pipe-scp-object-transfer-" + OBJECT_UPLOAD_THREAD_COUNTER.incrementAndGet())); - executor.allowCoreThreadTimeOut(true); - return executor; + private static ExecutorService createObjectUploadExecutor(final int maximumParallelism) { + return IoTDBThreadPoolFactory.newFixedThreadPool( + maximumParallelism, "pipe-scp-object-transfer"); } private static final class PooledWorkerSession {
