This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 441b43c0bf330ac55c5543982c22c69dc3bb5f4a
Author: 陈哲涵 <[email protected]>
AuthorDate: Wed Apr 29 06:17:00 2026 +0000

    [TIMECHODB] Fixed object pipe bugs
    
    (cherry picked from commit 1bb588274ef22914dfa0afdbbfb4c0c1dc0cb5af)
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   6 +-
 .../sink/protocol/tsfile/PipeTsFileLocalSink.java  |  67 ++++++++--
 .../sink/util/builder/TsFileNameGenerator.java     |  63 +++++++++
 .../sink/util/builder/TsFileNameGeneratorTest.java | 146 +++++++++++++++++++++
 .../plugin/sink/tsfile/PipeTsFileRemoteSink.java   |  70 ++++++++--
 .../plugin/sink/tsfile/ScpRemoteFileTransfer.java  |  16 ++-
 6 files changed, 337 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 19abb31b28b..803269377a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -373,13 +373,9 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
         final Iterator<String> pathIterator = objectPathIterator();
         final int linked =
             PipeDataNodeResourceManager.object().linkObjectFiles(resource, 
pathIterator, pipeName);
+        hasObjectData = linked > 0;
         if (linked > 0) {
-          if (hasObjectData == null) {
-            hasObjectData = true;
-          }
           PipeDataNodeResourceManager.object().increaseReference(resource, 
pipeName);
-        } else if (hasObjectData == null) {
-          hasObjectData = false;
         }
 
         PipeDataNodeResourceManager.object().setTsFileClosed(resource, 
pipeName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
index f1627af7a9e..943d0efd147 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
 
 import org.apache.iotdb.commons.pipe.sink.protocol.PipeBatchMetricsSettable;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -69,10 +70,9 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileLocalSink.class);
 
-  private final TsFileNameGenerator tsFileNameGenerator = new 
TsFileNameGenerator();
-
   private FileTransfer fileTransfer;
   private PipeTabletEventTsFileBatch eventTsFileBatch;
+  private List<Pair<String, Pair<File, File>>> sealedBatchedTsFiles;
 
   private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
   private Histogram tsFileBatchTimeIntervalHistogram = new 
DoNothingHistogram();
@@ -164,18 +164,27 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
       return;
     }
     if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
-      final File tsFile = tsFileInsertionEvent.getTsFile();
+      final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent) 
tsFileInsertionEvent;
+      if (!event.waitForTsFileClose()) {
+        LOGGER.warn(
+            "Pipe skipping temporary TsFile which shouldn't be transferred: 
{}", event.getTsFile());
+        return;
+      }
+
+      final File tsFile = event.getTsFile();
       if (tsFile != null && tsFile.exists()) {
         fileTransfer.transferFile(
             tsFile,
             PipeObjectPathUtil.resolveLinkedObjectDirectory(
-                ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getTsFileResource(),
-                ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).getPipeName()),
-            tsFileNameGenerator.nextFileName());
+                event.getTsFileResource(), event.getPipeName()),
+            TsFileNameGenerator.targetNameForEvent(event));
       }
     } else {
-      fileTransfer.transferFile(
-          tsFileInsertionEvent.getTsFile(), null, 
tsFileNameGenerator.nextFileName());
+      final File tsFile = tsFileInsertionEvent.getTsFile();
+      if (tsFile != null && tsFile.exists()) {
+        fileTransfer.transferFile(
+            tsFile, null, 
TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent));
+      }
     }
   }
 
@@ -192,8 +201,14 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
 
   @Override
   public void close() throws Exception {
+    cleanupSealedBatchedTsFiles();
+    if (eventTsFileBatch != null) {
+      eventTsFileBatch.close();
+      eventTsFileBatch = null;
+    }
     if (fileTransfer != null) {
       fileTransfer.close();
+      fileTransfer = null;
     }
   }
 
@@ -238,7 +253,7 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
     if (!eventTsFileBatch.shouldEmit() || eventTsFileBatch.isEmpty()) {
       return;
     }
-    final List<Pair<String, Pair<File, File>>> list = 
eventTsFileBatch.sealTsFiles();
+    final List<Pair<String, Pair<File, File>>> list = 
getOrSealBatchedTsFiles();
     for (final Pair<String, Pair<File, File>> sealed : list) {
       final Pair<File, File> tsFileAndObjectDir = sealed.getRight();
       if (tsFileAndObjectDir == null) {
@@ -247,10 +262,42 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
       final File tsFile = tsFileAndObjectDir.getLeft();
       final File objectDir = tsFileAndObjectDir.getRight();
       if (tsFile != null && tsFile.exists()) {
-        fileTransfer.transferFile(tsFile, objectDir, 
tsFileNameGenerator.nextFileName());
+        fileTransfer.transferFile(
+            tsFile, objectDir, 
TsFileNameGenerator.targetNameForGeneratedFile(tsFile));
       }
     }
     
eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileLocalSink.class.getName(),
 true);
+    cleanupSealedBatchedTsFiles();
     eventTsFileBatch.onSuccess();
   }
+
+  private List<Pair<String, Pair<File, File>>> getOrSealBatchedTsFiles() 
throws Exception {
+    if (sealedBatchedTsFiles == null) {
+      sealedBatchedTsFiles = eventTsFileBatch.sealTsFiles();
+    }
+    return sealedBatchedTsFiles;
+  }
+
+  private void cleanupSealedBatchedTsFiles() {
+    if (sealedBatchedTsFiles == null) {
+      return;
+    }
+
+    for (final Pair<String, Pair<File, File>> sealed : sealedBatchedTsFiles) {
+      if (sealed == null || sealed.getRight() == null) {
+        continue;
+      }
+
+      final File tsFile = sealed.getRight().getLeft();
+      final File objectDir = sealed.getRight().getRight();
+      if (tsFile != null && tsFile.exists()) {
+        FileUtils.deleteFileOrDirectory(tsFile, true);
+      }
+      if (objectDir != null && objectDir.exists()) {
+        FileUtils.deleteFileOrDirectory(objectDir, true);
+      }
+    }
+
+    sealedBatchedTsFiles = null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java
index b6623056c9a..5b696bdb5ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java
@@ -19,10 +19,20 @@
 
 package org.apache.iotdb.db.pipe.sink.util.builder;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+
+import java.io.File;
 import java.util.UUID;
+import java.util.regex.Pattern;
 
 public class TsFileNameGenerator {
 
+  private static final Pattern NON_FILE_NAME_CHAR_PATTERN = 
Pattern.compile("[^A-Za-z0-9._-]");
+
   private final String runId;
   private long lastTimestamp;
   private long indexSeq;
@@ -49,4 +59,57 @@ public class TsFileNameGenerator {
   public String getRunId() {
     return runId;
   }
+
+  public static String targetNameForEvent(final TsFileInsertionEvent event) {
+    if (event == null) {
+      return "tsfile";
+    }
+
+    if (event instanceof EnrichedEvent) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+      final CommitterKey committerKey = enrichedEvent.getCommitterKey();
+      if (committerKey != null && enrichedEvent.getCommitId() != 
EnrichedEvent.NO_COMMIT_ID) {
+        return sanitizeFileName(
+            stripTsFileSuffix(event.getTsFile())
+                + "_"
+                + committerKey.stringify()
+                + "_"
+                + enrichedEvent.getCommitId());
+      }
+    }
+
+    return targetNameForFile(event.getTsFile());
+  }
+
+  public static String targetNameForFile(final File tsFile) {
+    if (tsFile == null) {
+      return "tsfile";
+    }
+
+    final String normalizedPath = 
tsFile.toPath().toAbsolutePath().normalize().toString();
+    return sanitizeFileName(
+        stripTsFileSuffix(tsFile) + "_" + 
Integer.toUnsignedString(normalizedPath.hashCode()));
+  }
+
+  public static String targetNameForGeneratedFile(final File tsFile) {
+    return sanitizeFileName(stripTsFileSuffix(tsFile));
+  }
+
+  private static String stripTsFileSuffix(final File tsFile) {
+    return stripTsFileSuffix(tsFile == null ? null : tsFile.getName());
+  }
+
+  private static String stripTsFileSuffix(final String tsFileName) {
+    if (tsFileName == null || tsFileName.isEmpty()) {
+      return "tsfile";
+    }
+    return tsFileName.endsWith(TsFileConstant.TSFILE_SUFFIX)
+        ? tsFileName.substring(0, tsFileName.length() - 
TsFileConstant.TSFILE_SUFFIX.length())
+        : tsFileName;
+  }
+
+  private static String sanitizeFileName(final String rawName) {
+    final String sanitized = 
NON_FILE_NAME_CHAR_PATTERN.matcher(rawName).replaceAll("_");
+    return sanitized.isEmpty() ? "tsfile" : sanitized;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGeneratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGeneratorTest.java
new file mode 100644
index 00000000000..dd12077abbe
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGeneratorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.builder;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Collections;
+
+public class TsFileNameGeneratorTest {
+
+  @Test
+  public void testTargetNameForEventUsesStableCommitIdentity() {
+    final TestTsFileEvent event = new TestTsFileEvent(new 
File("source.tsfile"));
+    event.setCommitterKeyAndCommitId(new CommitterKey("pipe name/1", 100L, 7, 
2), 11L);
+
+    Assert.assertEquals(
+        "source_pipe_name_1_7_100_2_11", 
TsFileNameGenerator.targetNameForEvent(event));
+  }
+
+  @Test
+  public void testTargetNameForFileDistinguishesDifferentPaths() {
+    final File first = new File("target/first/same.tsfile");
+    final File second = new File("target/second/same.tsfile");
+
+    Assert.assertNotEquals(
+        TsFileNameGenerator.targetNameForFile(first),
+        TsFileNameGenerator.targetNameForFile(second));
+  }
+
+  @Test
+  public void testTargetNameForGeneratedFileStripsTsFileSuffix() {
+    Assert.assertEquals(
+        "tb_1_2_3", TsFileNameGenerator.targetNameForGeneratedFile(new 
File("tb_1_2_3.tsfile")));
+  }
+
+  private static class TestTsFileEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
+
+    private final File tsFile;
+
+    private TestTsFileEvent(final File tsFile) {
+      super(
+          "pipe",
+          1L,
+          (PipeTaskMeta) null,
+          (TreePattern) null,
+          (TablePattern) null,
+          null,
+          null,
+          null,
+          true,
+          Long.MIN_VALUE,
+          Long.MAX_VALUE);
+      this.tsFile = tsFile;
+    }
+
+    @Override
+    public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public File getTsFile() {
+      return tsFile;
+    }
+
+    @Override
+    public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
+      return true;
+    }
+
+    @Override
+    public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
+      return true;
+    }
+
+    @Override
+    public ProgressIndex getProgressIndex() {
+      return MinimumProgressIndex.INSTANCE;
+    }
+
+    @Override
+    public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+        final String pipeName,
+        final long creationTime,
+        final PipeTaskMeta pipeTaskMeta,
+        final TreePattern treePattern,
+        final TablePattern tablePattern,
+        final String userId,
+        final String userName,
+        final String cliHostname,
+        final boolean skipIfNoPrivileges,
+        final long startTime,
+        final long endTime) {
+      return this;
+    }
+
+    @Override
+    public boolean isGeneratedByPipe() {
+      return false;
+    }
+
+    @Override
+    public boolean mayEventTimeOverlappedWithTimeRange() {
+      return true;
+    }
+
+    @Override
+    public boolean mayEventPathsOverlappedWithPattern() {
+      return true;
+    }
+
+    @Override
+    public void close() {
+      // do nothing
+    }
+  }
+}
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
index f876295275b..de4f7fb89a2 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.pipe.plugin.sink.tsfile;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.sink.protocol.PipeBatchMetricsSettable;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -40,7 +41,6 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.apache.tsfile.utils.Pair;
@@ -72,9 +72,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileRemoteSink.class);
-  private final TsFileNameGenerator tsFileNameGenerator = new 
TsFileNameGenerator();
   private RemoteFileTransfer remoteFileTransfer;
   private PipeTabletEventTsFileBatch eventTsFileBatch;
+  private List<Pair<String, Pair<File, File>>> sealedBatchedTsFiles;
 
   private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
   private Histogram tsFileBatchTimeIntervalHistogram = new 
DoNothingHistogram();
@@ -146,23 +146,28 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
     if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
       final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent) 
tsFileInsertionEvent;
       if (!event.waitForTsFileClose()) {
-        throw new PipeException(
-            "Timeout waiting for tsfile close before sink transfer: " + 
tsFileInsertionEvent);
+        LOGGER.warn(
+            "Pipe skipping temporary TsFile which shouldn't be transferred: 
{}", event.getTsFile());
+        return;
       }
+
       final TsFileResource tsFileResource = event.getTsFileResource();
-      if (tsFileResource == null) {
-        throw new PipeException("TsFile resource is null, event: " + 
tsFileInsertionEvent);
-      }
-      final File tsFile = tsFileResource.getTsFile();
+      final File tsFile = event.getTsFile();
       if (tsFile != null && tsFile.exists()) {
         remoteFileTransfer.transferFile(
             tsFile,
-            PipeObjectPathUtil.resolveLinkedObjectDirectory(tsFileResource, 
event.getPipeName()),
-            tsFileNameGenerator.nextFileName());
+            tsFileResource == null
+                ? null
+                : PipeObjectPathUtil.resolveLinkedObjectDirectory(
+                    tsFileResource, event.getPipeName()),
+            TsFileNameGenerator.targetNameForEvent(event));
       }
     } else {
-      remoteFileTransfer.transferFile(
-          tsFileInsertionEvent.getTsFile(), null, 
tsFileNameGenerator.nextFileName());
+      final File tsFile = tsFileInsertionEvent.getTsFile();
+      if (tsFile != null && tsFile.exists()) {
+        remoteFileTransfer.transferFile(
+            tsFile, null, 
TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent));
+      }
     }
   }
 
@@ -207,6 +212,11 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
 
   @Override
   public void close() throws Exception {
+    cleanupSealedBatchedTsFiles();
+    if (eventTsFileBatch != null) {
+      eventTsFileBatch.close();
+      eventTsFileBatch = null;
+    }
     if (remoteFileTransfer != null) {
       remoteFileTransfer.close();
       remoteFileTransfer = null;
@@ -254,7 +264,7 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
     if (!eventTsFileBatch.shouldEmit() || eventTsFileBatch.isEmpty()) {
       return;
     }
-    final List<Pair<String, Pair<File, File>>> list = 
eventTsFileBatch.sealTsFiles();
+    final List<Pair<String, Pair<File, File>>> list = 
getOrSealBatchedTsFiles();
     for (final Pair<String, Pair<File, File>> sealed : list) {
       final Pair<File, File> tsFileAndObjectDir = sealed.getRight();
       if (tsFileAndObjectDir == null) {
@@ -263,10 +273,42 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
       final File tsFile = tsFileAndObjectDir.getLeft();
       final File objectDir = tsFileAndObjectDir.getRight();
       if (tsFile != null && tsFile.exists()) {
-        remoteFileTransfer.transferFile(tsFile, objectDir, 
tsFileNameGenerator.nextFileName());
+        remoteFileTransfer.transferFile(
+            tsFile, objectDir, 
TsFileNameGenerator.targetNameForGeneratedFile(tsFile));
       }
     }
     
eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileRemoteSink.class.getName(),
 true);
+    cleanupSealedBatchedTsFiles();
     eventTsFileBatch.onSuccess();
   }
+
+  private List<Pair<String, Pair<File, File>>> getOrSealBatchedTsFiles() 
throws Exception {
+    if (sealedBatchedTsFiles == null) {
+      sealedBatchedTsFiles = eventTsFileBatch.sealTsFiles();
+    }
+    return sealedBatchedTsFiles;
+  }
+
+  private void cleanupSealedBatchedTsFiles() {
+    if (sealedBatchedTsFiles == null) {
+      return;
+    }
+
+    for (final Pair<String, Pair<File, File>> sealed : sealedBatchedTsFiles) {
+      if (sealed == null || sealed.getRight() == null) {
+        continue;
+      }
+
+      final File tsFile = sealed.getRight().getLeft();
+      final File objectDir = sealed.getRight().getRight();
+      if (tsFile != null && tsFile.exists()) {
+        FileUtils.deleteFileOrDirectory(tsFile, true);
+      }
+      if (objectDir != null && objectDir.exists()) {
+        FileUtils.deleteFileOrDirectory(objectDir, true);
+      }
+    }
+
+    sealedBatchedTsFiles = null;
+  }
 }
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
index c99808be19a..c47545902cf 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
@@ -182,8 +182,20 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer {
   private void executeRemoteCommand(ClientSession s, String command) throws 
IOException {
     try (ChannelExec channel = s.createExecChannel(command)) {
       channel.open().verify(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-      channel.waitFor(WAIT_FOR_CLOSED, CONNECT_TIMEOUT_MS);
-    } catch (Exception ignore) {
+      final Set<ClientChannelEvent> events = channel.waitFor(WAIT_FOR_CLOSED, 
CONNECT_TIMEOUT_MS);
+      if (!events.contains(ClientChannelEvent.CLOSED)) {
+        throw new IOException("Remote command timed out: " + command);
+      }
+
+      final Integer exitStatus = channel.getExitStatus();
+      if (exitStatus != null && exitStatus != 0) {
+        throw new IOException(
+            String.format("Remote command failed with exit code %s: %s", 
exitStatus, command));
+      }
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to execute remote command: " + command, e);
     }
   }
 

Reply via email to