This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 643272787477859fcc17b35139c12e875ed58ed4
Author: 罗振羽 <[email protected]>
AuthorDate: Thu May 14 15:58:29 2026 +0000

    [TIMECHODB][TIMECHO] Fix scp thread nums && Historical files were deleted 
during the Linked process.
    
    (cherry picked from commit 496e8a0ae52d0d88028f5cc8683a07a1d534fa85)
---
 .../manual/basic/IoTDBPipeTsFileSinkObjectIT.java  | 246 ++++++++++++++++++++-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   9 +-
 .../resource/tsfile/PipeTsFileResourceManager.java |  26 ++-
 ...istoricalDataRegionTsFileAndDeletionSource.java |  12 +-
 .../dataregion/modification/ModificationFile.java  |  37 ++++
 .../plugin/sink/tsfile/ScpRemoteFileTransfer.java  |  22 +-
 .../plugin/sink/tsfile/ScpSshClientManager.java    |  58 +++++
 7 files changed, 369 insertions(+), 41 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
index 52f50b078d8..afb53732fa0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
@@ -55,7 +55,11 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2DualTableManualBasic.class})
@@ -67,10 +71,12 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
   private static final String OBJECT_TABLE_NAME = "factory_metrics";
 
   private static final int OBJECT_MULTI_WEEK_DEVICE_COUNT = 5;
+  private static final int OBJECT_INCREMENTAL_TSFILE_COUNT = 100;
 
   private static final long HOUR_MS = 3600 * 1000L;
   private static final long DAY_MS = 24L * HOUR_MS;
   private static final long WEEK_MS = 7L * DAY_MS;
+  private static final long EXPORT_SCAN_SLEEP_MS = 500L;
 
   /** Base time aligned so each device sits in a distinct week bucket. */
   private static final long OBJECT_BASE_TIME = 1600000000000L;
@@ -225,6 +231,209 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
     }
   }
 
+  @Test
+  public void 
testPipeTsFileLocalSinkObjectHundredGeneratedTsFilesLoadToReceiverWithoutMods()
+      throws Exception {
+    executeIncrementalExportLoadRoundTrip(false);
+  }
+
+  @Test
+  public void 
testPipeTsFileLocalSinkObjectHundredGeneratedTsFilesLoadToReceiverWithMods()
+      throws Exception {
+    executeIncrementalExportLoadRoundTrip(true);
+  }
+
+  private void executeIncrementalExportLoadRoundTrip(final boolean withMods) 
throws Exception {
+    final String database = "db1";
+    final String pipeName =
+        withMods
+            ? "p_incremental_generated_obj_with_mods"
+            : "p_incremental_generated_obj_without_mods";
+    final Map<String, List<Long>> expectedTimesByDevice = new 
LinkedHashMap<>();
+    final List<File> sourceTsFiles = new ArrayList<>();
+
+    for (int i = 0; i < OBJECT_INCREMENTAL_TSFILE_COUNT; i++) {
+      final String deviceId = String.format("device_%03d", i + 1);
+      final File tsFile =
+          new File(
+              sourceTsDir,
+              withMods
+                  ? String.format("incremental_object_with_mods_%03d.tsfile", 
i + 1)
+                  : 
String.format("incremental_object_without_mods_%03d.tsfile", i + 1));
+      final long startTime = OBJECT_BASE_TIME + (long) i * DAY_MS;
+      final long endTime = startTime + HOUR_MS;
+
+      try (StandardObjectTableModelTsFileGenerator generator =
+          new StandardObjectTableModelTsFileGenerator(tsFile)) {
+        generator.writeDeviceData(OBJECT_TABLE_NAME, deviceId, startTime, 
endTime, HOUR_MS);
+        if (withMods) {
+          generator.generateDeletion(OBJECT_TABLE_NAME, deviceId, endTime, 
endTime);
+          expectedTimesByDevice.put(
+              deviceId, generateExpectedTimes(startTime, endTime, HOUR_MS, 
endTime, endTime));
+        } else {
+          expectedTimesByDevice.put(deviceId, generateExpectedTimes(startTime, 
endTime, HOUR_MS));
+        }
+      }
+      sourceTsFiles.add(tsFile);
+    }
+
+    try (ITableSession sender = senderEnv.getTableSessionConnection()) {
+      sender.executeNonQueryStatement(String.format("CREATE DATABASE IF NOT 
EXISTS %s", database));
+      sender.executeNonQueryStatement(String.format("USE \"%s\"", database));
+      for (File tsFile : sourceTsFiles) {
+        sender.executeNonQueryStatement(String.format("LOAD '%s'", 
tsFile.getAbsolutePath()));
+      }
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      sender.executeNonQueryStatement(
+          String.format(
+              "CREATE PIPE %s "
+                  + "WITH SOURCE ("
+                  + "'source.capture.table'='true', "
+                  + "'source.database-name'='%s', "
+                  + "'source.table-name'='%s', "
+                  + "'source.inclusion'='data.insert', "
+                  + "'source.mods.enable'='%s', "
+                  + "'source.history.enable'='true', "
+                  + "'source.realtime.enable'='false' "
+                  + ") "
+                  + "WITH SINK ("
+                  + "'sink'='tsfile-local-sink', "
+                  + "'sink.local.target-path'='%s', "
+                  + "'sink.batch.max-delay-seconds'='1', "
+                  + "'sink.batch.size-bytes'='1048576'"
+                  + ")",
+              pipeName, database, OBJECT_TABLE_NAME, withMods, targetDir));
+    }
+
+    try {
+      try (ITableSession receiver = receiverEnv.getTableSessionConnection()) {
+        receiver.executeNonQueryStatement(
+            String.format("CREATE DATABASE IF NOT EXISTS %s", database));
+        receiver.executeNonQueryStatement(String.format("USE \"%s\"", 
database));
+        loadExportedTsFilesIncrementallyUntilExpectedOnReceiver(
+            receiver, new File(targetDir), database, expectedTimesByDevice, 
180_000L);
+      }
+    } finally {
+      try (ITableSession sender = senderEnv.getTableSessionConnection()) {
+        sender.executeNonQueryStatement("DROP PIPE " + pipeName);
+      }
+    }
+
+    if (!withMods) {
+      final List<File> modsFiles = new ArrayList<>();
+      findFilesBySuffix(new File(targetDir), ModificationFile.FILE_SUFFIX, 
modsFiles);
+      Assert.assertTrue(
+          "No companion mods file should be exported for the pure object 
TsFiles.",
+          modsFiles.isEmpty());
+    }
+  }
+
+  private void loadExportedTsFilesIncrementallyUntilExpectedOnReceiver(
+      final ITableSession receiver,
+      final File exportRoot,
+      final String database,
+      final Map<String, List<Long>> expectedTimesByDevice,
+      final long timeoutMs)
+      throws Exception {
+    final long expectedTotalRows = 
calculateExpectedRowCount(expectedTimesByDevice);
+    final long deadline = System.currentTimeMillis() + timeoutMs;
+    final Set<String> loadedTsFilePaths = new HashSet<>();
+
+    long previousDiscoveredTsFileCount = -1;
+    Throwable lastValidationFailure = null;
+
+    while (System.currentTimeMillis() < deadline) {
+      final List<File> exportedTsFiles = new ArrayList<>();
+      findTsFiles(exportRoot, exportedTsFiles);
+      exportedTsFiles.sort(Comparator.comparing(File::getAbsolutePath));
+
+      boolean loadedAnyThisRound = false;
+      for (File tsFile : exportedTsFiles) {
+        final String tsFilePath = tsFile.getAbsolutePath();
+        if (loadedTsFilePaths.contains(tsFilePath)) {
+          continue;
+        }
+
+        try {
+          receiver.executeNonQueryStatement(
+              String.format("LOAD '%s' WITH ('database-name'='%s')", 
tsFilePath, database));
+          loadedTsFilePaths.add(tsFilePath);
+          loadedAnyThisRound = true;
+        } catch (Exception e) {
+          LOGGER.info("Receiver LOAD will retry later for exported TsFile {}", 
tsFilePath, e);
+        }
+      }
+
+      if (loadedAnyThisRound) {
+        TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+      }
+
+      final long actualRowCount = queryRowCountOrNegative(receiver, 
OBJECT_TABLE_NAME);
+      if (actualRowCount == expectedTotalRows) {
+        try {
+          for (Map.Entry<String, List<Long>> entry : 
expectedTimesByDevice.entrySet()) {
+            assertDeviceObjectBytesMatchGenerator(
+                receiver, OBJECT_TABLE_NAME, entry.getKey(), entry.getValue());
+          }
+          return;
+        } catch (Throwable t) {
+          lastValidationFailure = t;
+        }
+      }
+
+      final long discoveredTsFileCount = exportedTsFiles.size();
+      if (!loadedAnyThisRound && discoveredTsFileCount == 
previousDiscoveredTsFileCount) {
+        Thread.sleep(EXPORT_SCAN_SLEEP_MS);
+      }
+      previousDiscoveredTsFileCount = discoveredTsFileCount;
+    }
+
+    final List<File> exportedTsFiles = new ArrayList<>();
+    findTsFiles(exportRoot, exportedTsFiles);
+    exportedTsFiles.sort(Comparator.comparing(File::getAbsolutePath));
+
+    final List<String> pendingTsFiles = new ArrayList<>();
+    for (File tsFile : exportedTsFiles) {
+      final String tsFilePath = tsFile.getAbsolutePath();
+      if (!loadedTsFilePaths.contains(tsFilePath)) {
+        pendingTsFiles.add(tsFilePath);
+      }
+    }
+
+    final long actualRowCount = queryRowCountOrNegative(receiver, 
OBJECT_TABLE_NAME);
+    final String baseMessage =
+        String.format(
+            "Timeout waiting receiver data to match expected rows. 
expected=%d, actual=%d, "
+                + "loadedTsFiles=%d, discoveredTsFiles=%d, pendingTsFiles=%s",
+            expectedTotalRows,
+            actualRowCount,
+            loadedTsFilePaths.size(),
+            exportedTsFiles.size(),
+            pendingTsFiles);
+    if (lastValidationFailure != null) {
+      throw new AssertionError(baseMessage, lastValidationFailure);
+    }
+    Assert.fail(baseMessage);
+  }
+
+  private static long calculateExpectedRowCount(
+      final Map<String, List<Long>> expectedTimesByDevice) {
+    long totalRowCount = 0;
+    for (List<Long> expectedTimes : expectedTimesByDevice.values()) {
+      totalRowCount += expectedTimes.size();
+    }
+    return totalRowCount;
+  }
+
+  private static long queryRowCountOrNegative(final ITableSession session, 
final String table) {
+    try {
+      return queryRowCount(session, table);
+    } catch (Exception e) {
+      return -1;
+    }
+  }
+
   private static List<Long> generateExpectedTimes(
       final long startTime, final long endTime, final long interval) {
     final List<Long> times = new ArrayList<>();
@@ -234,6 +443,22 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
     return times;
   }
 
+  private static List<Long> generateExpectedTimes(
+      final long startTime,
+      final long endTime,
+      final long interval,
+      final long deleteStartTime,
+      final long deleteEndTime) {
+    final List<Long> times = new ArrayList<>();
+    for (long t = startTime; t <= endTime; t += interval) {
+      if (t >= deleteStartTime && t <= deleteEndTime) {
+        continue;
+      }
+      times.add(t);
+    }
+    return times;
+  }
+
   /**
    * Polls until at least one {@code .tsfile} appears under {@code root} 
(recursive) or {@code
    * timeoutMs} elapses.
@@ -427,14 +652,15 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
             + root.getAbsolutePath());
   }
 
-  private static void loadAllTsFilesUnderDir(final ITableSession session, 
final File root)
-      throws Exception {
+  private static void loadAllTsFilesUnderDir(
+      final ITableSession session, final File root, final String database) 
throws Exception {
     final List<File> tsfiles = new ArrayList<>();
     findTsFiles(root, tsfiles);
     Assert.assertFalse(tsfiles.isEmpty());
     tsfiles.sort(Comparator.comparing(File::getAbsolutePath));
     for (File f : tsfiles) {
-      session.executeNonQueryStatement(String.format("LOAD '%s'", 
f.getAbsolutePath()));
+      session.executeNonQueryStatement(
+          String.format("LOAD '%s' WITH ('database-name'='%s')", 
f.getAbsolutePath(), database));
     }
   }
 
@@ -461,6 +687,20 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
     }
   }
 
+  private static void findFilesBySuffix(File dir, String suffix, List<File> 
matchedFiles) {
+    File[] files = dir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File f : files) {
+      if (f.isDirectory()) {
+        findFilesBySuffix(f, suffix, matchedFiles);
+      } else if (f.getName().endsWith(suffix)) {
+        matchedFiles.add(f);
+      }
+    }
+  }
+
   private static void deleteDirectoryQuietly(Path dirPath) {
     if (!Files.exists(dirPath)) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 670e7c5360a..843908882e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -370,7 +370,11 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     extractTime = System.nanoTime();
     final List<ModEntry> linkedObjectColumnModEntries = new ArrayList<>();
+    final File firstName = tsFile;
     try {
+      if (isGeneratedByHistoricalExtractor) {
+        tsFile = 
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(tsFile, pipeName);
+      }
       if (Objects.nonNull(pipeName)) {
         final boolean shouldLinkObjectFiles = !Objects.equals(hasObjectData, 
Boolean.FALSE);
         final Iterator<String> pathIterator =
@@ -383,8 +387,9 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
 
         PipeDataNodeResourceManager.object().setTsFileClosed(resource, 
pipeName);
       }
-      final File firstName = tsFile;
-      tsFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, 
pipeName);
+
+      tsFile =
+          
PipeDataNodeResourceManager.tsfile().increaseFileReference(firstName, true, 
pipeName);
       if (isWithMod) {
         if (modFile != null) {
           modFile =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 644beb1ac40..0589411327a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -163,6 +163,7 @@ public class PipeTsFileResourceManager {
 
     try (ModificationFile modificationFile =
         new ModificationFile(ModificationFile.getExclusiveMods(pipeTsFile), 
false)) {
+      modificationFile.clear();
       modificationFile.write(toWrite);
       modsFile = modificationFile.getFile();
     }
@@ -172,12 +173,20 @@ public class PipeTsFileResourceManager {
       if (Objects.nonNull(pipeName)) {
         hardlinkOrCopiedFileToPipeTsFileResourceMap
             .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-            .put(modsFile.getPath(), new PipeTsFileResource(modsFile));
+            .compute(
+                modsFile.getPath(),
+                (path, existingResource) -> {
+                  if (existingResource == null) {
+                    return new PipeTsFileResource(modsFile);
+                  } else {
+                    existingResource.increaseReferenceCount();
+                    return existingResource;
+                  }
+                });
       }
     } finally {
       segmentLock.unlock(modsFile);
     }
-    increasePublicReference(modsFile, pipeName, false);
     return modsFile;
   }
 
@@ -393,24 +402,21 @@ public class PipeTsFileResourceManager {
         : hardlinkOrCopiedFileToTsFilePublicResourceMap;
   }
 
-  public void pinTsFileResource(
-      final TsFileResource resource, final boolean withMods, final @Nullable 
String pipeName)
+  public void pinTsFileResource(final TsFileResource resource, final @Nullable 
String pipeName)
       throws IOException {
     increaseFileReference(resource.getTsFile(), true, pipeName);
-    if (withMods && resource.getExclusiveModFile().exists()) {
+    // Create hard links as long as the mods exist, ensuring that the Object 
can be used normally.
+    if (resource.getExclusiveModFile().exists()) {
       increaseFileReference(resource.getExclusiveModFile().getFile(), false, 
pipeName);
     }
   }
 
-  public void unpinTsFileResource(
-      final TsFileResource resource,
-      final boolean shouldTransferModFile,
-      final @Nullable String pipeName)
+  public void unpinTsFileResource(final TsFileResource resource, final 
@Nullable String pipeName)
       throws IOException {
     decreaseFileReference(
         getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName), 
pipeName);
 
-    if (shouldTransferModFile && resource.exclusiveModFileExists()) {
+    if (resource.exclusiveModFileExists()) {
       decreaseFileReference(
           
getHardlinkOrCopiedFileInPipeDir(resource.getExclusiveModFile().getFile(), 
pipeName),
           pipeName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 0440000a140..77607071bb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -640,8 +640,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
                 // Pin the resource, in case the file is removed by compaction 
or anything.
                 // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
                 try {
-                  PipeDataNodeResourceManager.tsfile()
-                      .pinTsFileResource(resource, shouldTransferModFile, 
pipeName);
+                  
PipeDataNodeResourceManager.tsfile().pinTsFileResource(resource, pipeName);
                   return false;
                 } catch (final IOException e) {
                   LOGGER.warn("Pipe: failed to pin TsFileResource {}", 
resource.getTsFilePath(), e);
@@ -866,8 +865,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       return true;
     } finally {
       try {
-        PipeDataNodeResourceManager.tsfile()
-            .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
+        PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, 
pipeName);
       } catch (final IOException e) {
         LOGGER.warn(
             "Pipe {}@{}: failed to unpin skipped historical TsFileResource, 
original path: {}",
@@ -955,8 +953,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       return isReferenceCountIncreased ? event : null;
     } finally {
       try {
-        PipeDataNodeResourceManager.tsfile()
-            .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
+        PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, 
pipeName);
       } catch (final IOException e) {
         LOGGER.warn(
             "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
@@ -1048,8 +1045,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
             if (resource instanceof TsFileResource) {
               try {
                 PipeDataNodeResourceManager.tsfile()
-                    .unpinTsFileResource(
-                        (TsFileResource) resource, shouldTransferModFile, 
pipeName);
+                    .unpinTsFileResource((TsFileResource) resource, pipeName);
               } catch (final IOException e) {
                 LOGGER.warn(
                     "Pipe {}@{}: failed to unpin TsFileResource after dropping 
pipe, original path: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 03f44c3c295..dc24ee955c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -163,6 +163,43 @@ public class ModificationFile implements AutoCloseable {
     updateModFileMetric(updateFileNum, size);
   }
 
+  public void clear() throws IOException {
+    lock.writeLock().lock();
+    try {
+      if (!fileExists) {
+        return;
+      }
+      long sizeBeforeClear = getFileLength();
+      if (fileOutputStream != null) {
+        fileOutputStream.close();
+        fileOutputStream = null;
+      }
+      if (channel != null) {
+        channel.truncate(0);
+        channel.close();
+        channel = null;
+      } else {
+        try (FileChannel tempChannel = FileChannel.open(file.toPath(), 
APPEND)) {
+          tempChannel.truncate(0);
+        }
+      }
+
+      if (cascadeFiles != null) {
+        for (ModificationFile cascadeFile : cascadeFiles) {
+          cascadeFile.clear();
+        }
+      }
+
+      if (updateMetrics && !removed) {
+        FileMetrics.getInstance().increaseModFileSize(-sizeBeforeClear);
+      }
+
+      fileExists = false;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
   private void updateModFileMetric(int num, long size) {
     if (!removed && updateMetrics) {
       FileMetrics.getInstance().increaseModFileNum(num);
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
index 87556ded0cd..e05b62a793b 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
@@ -116,7 +116,6 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
   private final ExecutorService objectUploadExecutor;
   private final BlockingQueue<PooledWorkerSession> idleWorkerSessions;
 
-  private SshClient client;
   private ClientSession session;
 
   ScpRemoteFileTransfer(PipeParameters params) {
@@ -516,22 +515,13 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer 
{
     }
   }
 
-  private SshClient getOrCreateClient() {
-    if (client == null || !client.isStarted()) {
-      synchronized (this) {
-        if (client == null || !client.isStarted()) {
-          System.setProperty("org.apache.sshd.security.provider.BC.enabled", 
"false");
-          client = SshClient.setUpDefaultClient();
-          client.start();
-        }
-      }
-    }
-    return client;
+  private SshClient getSharedClient() throws IOException {
+    return ScpSshClientManager.getClient();
   }
 
   private ClientSession createAuthenticatedSession() throws IOException {
     final ClientSession createdSession =
-        getOrCreateClient().connect(user, host, 
port).verify(CONNECT_TIMEOUT_MS).getSession();
+        getSharedClient().connect(user, host, 
port).verify(CONNECT_TIMEOUT_MS).getSession();
     createdSession.addPasswordIdentity(password != null ? password : "");
     createdSession.auth().verify(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     return createdSession;
@@ -606,10 +596,6 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
   private synchronized void invalidateSession() {
     invalidateMainSession();
     invalidateIdleWorkerSessions();
-    if (client != null && client.isStarted()) {
-      client.stop();
-      client = null;
-    }
   }
 
   private void invalidateIdleWorkerSessions() {
@@ -636,7 +622,7 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
   }
 
   @Override
-  public synchronized void close() {
+  public synchronized void close() throws IOException {
     objectUploadExecutor.shutdown();
     try {
       if (!objectUploadExecutor.awaitTermination(
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpSshClientManager.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpSshClientManager.java
new file mode 100644
index 00000000000..d31227d0c05
--- /dev/null
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpSshClientManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.plugin.sink.tsfile;
+
+import org.apache.sshd.client.SshClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class ScpSshClientManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ScpSshClientManager.class);
+
+  private static volatile SshClient client;
+
+  private ScpSshClientManager() {}
+
+  static SshClient getClient() throws IOException {
+    if (client == null || !client.isStarted()) {
+      synchronized (ScpSshClientManager.class) {
+        if (client == null || !client.isStarted()) {
+          client = createClient();
+          LOGGER.info("Created static shared SCP SSH client");
+        }
+      }
+    }
+    return client;
+  }
+
+  private static SshClient createClient() throws IOException {
+    try {
+      System.setProperty("org.apache.sshd.security.provider.BC.enabled", 
"false");
+      final SshClient sshClient = SshClient.setUpDefaultClient();
+      sshClient.start();
+      return sshClient;
+    } catch (Exception e) {
+      throw new IOException("Failed to create shared SCP SSH client", e);
+    }
+  }
+}

Reply via email to