This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 764f761379267878996370a871c2f71e433e8946
Author: 罗振羽 <[email protected]>
AuthorDate: Thu May 14 07:38:58 2026 +0000

    [TIMECHODB]refactor(pipe): use thread pool factory for scp uploads
    
    (cherry picked from commit ec5f4537de036ebfb924ab078c85e76fae889c34)
---
 .../plugin/sink/tsfile/ScpRemoteFileTransfer.java  | 36 +++++-----------------
 1 file changed, 7 insertions(+), 29 deletions(-)

diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
index 25005f3f931..1a27f026c87 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.pipe.plugin.sink.tsfile;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
@@ -48,11 +49,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
@@ -113,7 +113,7 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
   private final int objectUploadWaitingQueueSize;
   private final long objectUploadThreadKeepAliveSeconds;
   private final RateLimiter transferRateLimiter;
-  private final ThreadPoolExecutor objectUploadExecutor;
+  private final ExecutorService objectUploadExecutor;
   private final BlockingQueue<PooledWorkerSession> idleWorkerSessions;
 
   private SshClient client;
@@ -191,11 +191,7 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
           DEFAULT_OBJECT_UPLOAD_THREAD_KEEP_ALIVE_SECONDS);
     }
     this.idleWorkerSessions = new 
LinkedBlockingQueue<>(objectUploadParallelism);
-    this.objectUploadExecutor =
-        createObjectUploadExecutor(
-            objectUploadParallelism,
-            objectUploadWaitingQueueSize,
-            objectUploadThreadKeepAliveSeconds);
+    this.objectUploadExecutor = 
createObjectUploadExecutor(objectUploadParallelism);
 
     final double bytesPerSecond =
         params.getDoubleOrDefault(
@@ -445,27 +441,9 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
     }
   }
 
-  private static BlockingQueue<Runnable> createObjectUploadQueue(final int 
waitingQueueSize) {
-    return waitingQueueSize == 0
-        ? new SynchronousQueue<>()
-        : new LinkedBlockingQueue<>(waitingQueueSize);
-  }
-
-  private static ThreadPoolExecutor createObjectUploadExecutor(
-      final int maximumParallelism, final int waitingQueueSize, final long 
keepAliveSeconds) {
-    final ThreadPoolExecutor executor =
-        new ThreadPoolExecutor(
-            maximumParallelism,
-            maximumParallelism,
-            keepAliveSeconds,
-            TimeUnit.SECONDS,
-            createObjectUploadQueue(waitingQueueSize),
-            runnable ->
-                new Thread(
-                    runnable,
-                    "pipe-scp-object-transfer-" + 
OBJECT_UPLOAD_THREAD_COUNTER.incrementAndGet()));
-    executor.allowCoreThreadTimeOut(true);
-    return executor;
+  private static ExecutorService createObjectUploadExecutor(final int 
maximumParallelism) {
+    return IoTDBThreadPoolFactory.newFixedThreadPool(
+        maximumParallelism, "pipe-scp-object-transfer");
   }
 
   private static final class PooledWorkerSession {

Reply via email to