This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e95a45a9a3bb4b66c21ebaf7964617f052cf7f23
Author: 罗振羽 <[email protected]>
AuthorDate: Wed May 6 19:52:23 2026 +0800

    [TIMECHODB] fix(pipe): transfer exclusive mod in tsfile sinks and hard-link 
local files
    
    (cherry picked from commit 212c49f7debd1d2b610be518a794fbc8e09f8855)
---
 .../IoTDBPipeTableModelObjectImportExportIT.java   | 682 +++++++++++++++++++++
 .../manual/basic/IoTDBPipeTsFileSinkObjectIT.java  |  77 ++-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  87 ++-
 .../common/tsfile/TsFileObjectPathIterator.java    |  19 +-
 .../tsfile/parser/TsFileInsertionEventParser.java  |   3 +
 .../parser/TsFileInsertionEventParserProvider.java |  13 +-
 .../table/TsFileInsertionEventTableParser.java     |  99 ++-
 ...ileInsertionEventTableParserTabletIterator.java |  18 +-
 .../tsfile/parser/util/ModsOperationUtil.java      |  52 +-
 .../resource/tsfile/PipeTsFileResourceManager.java |  47 ++
 .../db/pipe/sink/protocol/tsfile/FileTransfer.java |  33 +-
 .../sink/protocol/tsfile/LocalFileTransfer.java    |  80 ++-
 .../sink/protocol/tsfile/PipeTsFileLocalSink.java  |   9 +-
 .../load/util/LoadObjectFileUtil.java              |  50 +-
 .../tsfile/parser/util/ModsOperationUtilTest.java  |  47 ++
 .../plugin/sink/tsfile/PipeTsFileRemoteSink.java   |   9 +-
 .../plugin/sink/tsfile/RemoteFileTransfer.java     |   3 +-
 .../plugin/sink/tsfile/ScpRemoteFileTransfer.java  |  28 +-
 18 files changed, 1262 insertions(+), 94 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTableModelObjectImportExportIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTableModelObjectImportExportIT.java
new file mode 100644
index 00000000000..5485c0ff1fb
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTableModelObjectImportExportIT.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
+
+import org.apache.iotdb.db.it.utils.StandardObjectTableModelTsFileGenerator;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
+import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTableManualBasic.class})
+public class IoTDBPipeTableModelObjectImportExportIT extends 
AbstractPipeTableModelDualManualIT {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBPipeTableModelObjectImportExportIT.class);
+
+  private static final String TEST_DATABASE = "db1";
+
+  private static final String OBJECT_TABLE_NAME = "factory_metrics";
+
+  private static final int OBJECT_MULTI_WEEK_DEVICE_COUNT = 5;
+
+  private static final int SOURCE_SINK_INSERT_ROWS = 100;
+
+  private static final int SOURCE_SINK_DELETE_FROM_FIRST_OFFSET = 20;
+  private static final int SOURCE_SINK_DELETE_LEN = 25;
+
+  private static final long EXPECTED_ROWS_AFTER_DELETE =
+      SOURCE_SINK_INSERT_ROWS - SOURCE_SINK_DELETE_LEN;
+
+  private static final long SENDER_DELETE_LO = 1L + 
SOURCE_SINK_DELETE_FROM_FIRST_OFFSET;
+
+  private static final long SENDER_DELETE_HI = SENDER_DELETE_LO + 
SOURCE_SINK_DELETE_LEN - 1;
+
+  private static final long HOUR_MS = 3600 * 1000L;
+  private static final long DAY_MS = 24L * HOUR_MS;
+  private static final long WEEK_MS = 7L * DAY_MS;
+
+  private static final long OBJECT_BASE_TIME = 1600000000000L;
+
+  private String targetDir;
+  private String sourceTsDir;
+
+  private enum TableSourceScope {
+    ALL,
+    DATABASE_ONLY,
+    DATABASE_AND_TABLE
+  }
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    try {
+      targetDir =
+          
Files.createTempDirectory("pipe_table_tsfile_source_sink_it").toAbsolutePath().toString();
+      sourceTsDir =
+          Files.createTempDirectory("pipe_table_tsfile_source_sink_it_src")
+              .toAbsolutePath()
+              .toString();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create temp directory", e);
+    }
+  }
+
+  @After
+  public void cleanUpTempDirectories() {
+    if (targetDir != null) {
+      deleteRecursivelyQuietly(Paths.get(targetDir));
+    }
+    if (sourceTsDir != null) {
+      deleteRecursivelyQuietly(Paths.get(sourceTsDir));
+    }
+  }
+
+  private static byte[] buildExpectedObjectPayload(final long timestamp) {
+    String payload = "Industrial_Grade_Payload_Verification_For_Timestamp_" + 
timestamp;
+    return payload.getBytes();
+  }
+
+  @Test
+  public void testHistoricalExportWithDeletesForAllScope() throws Exception {
+    verifyHistoricalObjectExportWithDeletes(
+        "p_scope_all_hist_mods",
+        TableSourceScope.ALL,
+        false,
+        true,
+        false,
+        EXPECTED_ROWS_AFTER_DELETE);
+  }
+
+  @Test
+  public void testHistoricalExportWithDeletesForDatabaseScope() throws 
Exception {
+    verifyHistoricalObjectExportWithDeletes(
+        "p_scope_db_hist_mods",
+        TableSourceScope.DATABASE_ONLY,
+        false,
+        true,
+        false,
+        EXPECTED_ROWS_AFTER_DELETE);
+  }
+
+  @Test
+  public void testHistoricalExportWithDeletesForDatabaseAndTableScope() throws 
Exception {
+    verifyHistoricalObjectExportWithDeletes(
+        "p_scope_dbt_hist_mods",
+        TableSourceScope.DATABASE_AND_TABLE,
+        false,
+        true,
+        false,
+        EXPECTED_ROWS_AFTER_DELETE);
+  }
+
+  @Test
+  public void testHistoricalExportWithoutDeletesForDatabaseAndTableScope() 
throws Exception {
+    verifyHistoricalObjectExportWithoutDeletes(
+        "p_scope_dbt_hist_nomods", TableSourceScope.DATABASE_AND_TABLE, true, 
false);
+  }
+
+  @Test
+  public void testHistoricalExportWithoutDeletesForAllScope() throws Exception 
{
+    verifyHistoricalObjectExportWithoutDeletes(
+        "p_scope_all_hist_nomods", TableSourceScope.ALL, true, false);
+  }
+
+  @Test
+  public void testHistoricalExportWithoutDeletesForDatabaseScope() throws 
Exception {
+    verifyHistoricalObjectExportWithoutDeletes(
+        "p_scope_db_hist_nomods", TableSourceScope.DATABASE_ONLY, true, false);
+  }
+
+  @Test
+  public void testObjectRoundTripFromMultiWeekGeneratedTsFile() throws 
Exception {
+    final File tsFile = new File(sourceTsDir, 
"five_devices_multi_week.tsfile");
+    final List<List<Long>> expectedTimesPerDevice = new ArrayList<>();
+
+    try (StandardObjectTableModelTsFileGenerator generator =
+        new StandardObjectTableModelTsFileGenerator(tsFile)) {
+      for (int i = 0; i < OBJECT_MULTI_WEEK_DEVICE_COUNT; i++) {
+        final String deviceId = String.format("device_%02d", i + 1);
+        final long weekStart = OBJECT_BASE_TIME + (long) i * WEEK_MS;
+        final long weekEnd = weekStart + 6 * DAY_MS;
+        generator.writeDeviceData(OBJECT_TABLE_NAME, deviceId, weekStart, 
weekEnd, DAY_MS);
+        expectedTimesPerDevice.add(buildExpectedTimestamps(weekStart, weekEnd, 
DAY_MS));
+      }
+    }
+
+    try (ITableSession sender = senderEnv.getTableSessionConnection()) {
+      sender.executeNonQueryStatement(
+          String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE));
+      useDatabase(sender, TEST_DATABASE);
+      sender.executeNonQueryStatement(
+          String.format(
+              "LOAD '%s' WITH ('database-name'='%s')", 
tsFile.getAbsolutePath(), TEST_DATABASE));
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      sender.executeNonQueryStatement(
+          String.format(
+              "CREATE PIPE p_src_sink_multi_week WITH SOURCE ("
+                  + "'source.capture.table'='true', "
+                  + "'source.database-name'='%s', "
+                  + "'source.table-name'='%s', "
+                  + "'source.inclusion'='data.insert', "
+                  + "'source.history.enable'='true', "
+                  + "'source.realtime.enable'='false' "
+                  + ") WITH SINK ("
+                  + "%s"
+                  + ")",
+              TEST_DATABASE, OBJECT_TABLE_NAME, buildTsFileLocalSinkClause()));
+
+      waitForExportedTsFile(new File(targetDir), 60_000);
+      sender.executeNonQueryStatement("DROP PIPE p_src_sink_multi_week");
+      sender.executeNonQueryStatement(String.format("DROP DATABASE %s", 
TEST_DATABASE));
+    }
+
+    final List<File> exportedTsFiles = new ArrayList<>();
+    collectTsFilesRecursively(new File(targetDir), exportedTsFiles);
+    Assert.assertFalse(exportedTsFiles.isEmpty());
+
+    try (ITableSession receiver = receiverEnv.getTableSessionConnection()) {
+      receiver.executeNonQueryStatement(
+          String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE));
+      useDatabase(receiver, TEST_DATABASE);
+      for (File f : exportedTsFiles) {
+        receiver.executeNonQueryStatement(
+            String.format(
+                "LOAD '%s' WITH ('database-name'='%s')", f.getAbsolutePath(), 
TEST_DATABASE));
+      }
+      TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+
+      for (int i = 0; i < OBJECT_MULTI_WEEK_DEVICE_COUNT; i++) {
+        final String deviceId = String.format("device_%02d", i + 1);
+        assertGeneratedObjectPayloadsByDevice(
+            receiver, OBJECT_TABLE_NAME, deviceId, 
expectedTimesPerDevice.get(i));
+      }
+    }
+  }
+
+  private void verifyHistoricalObjectExportWithDeletes(
+      final String pipeName,
+      final TableSourceScope scope,
+      final boolean realtimeFirst,
+      final boolean historyEnable,
+      final boolean realtimeEnable,
+      final long expectedRowsAfterLoad)
+      throws Exception {
+    try (ITableSession sender = senderEnv.getTableSessionConnection()) {
+      createSenderObjectTable(sender);
+
+      if (realtimeFirst) {
+        sender.executeNonQueryStatement(
+            String.format(
+                "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)",
+                pipeName,
+                buildObjectSourceClause(scope, true, historyEnable, 
realtimeEnable, true),
+                buildTsFileLocalSinkClause()));
+        Thread.sleep(2000);
+      }
+
+      insertObjectRowsWithContiguousTimestamps(sender, "t1", 1L, 
SOURCE_SINK_INSERT_ROWS);
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      final long delLo = 1L + SOURCE_SINK_DELETE_FROM_FIRST_OFFSET;
+      final long delHi = delLo + SOURCE_SINK_DELETE_LEN - 1;
+      sender.executeNonQueryStatement(
+          String.format("DELETE FROM t1 WHERE time >= %d AND time <= %d", 
delLo, delHi));
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      if (!realtimeFirst) {
+        sender.executeNonQueryStatement(
+            String.format(
+                "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)",
+                pipeName,
+                buildObjectSourceClause(scope, true, historyEnable, 
realtimeEnable, true),
+                buildTsFileLocalSinkClause()));
+      }
+
+      waitForExportedTsFilesWithMods(new File(targetDir), 120_000);
+      sender.executeNonQueryStatement("DROP PIPE IF EXISTS " + pipeName);
+      sender.executeNonQueryStatement("DROP TABLE IF EXISTS t1");
+    }
+
+    try (ITableSession receiver = receiverEnv.getTableSessionConnection()) {
+      createReceiverDatabaseForLoad(receiver);
+      loadExportedTsFiles(receiver, new File(targetDir));
+      TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+      Assert.assertEquals(expectedRowsAfterLoad, queryTableRowCount(receiver, 
"t1"));
+      assertNoRowsExistInDeletedTimeRange(receiver, SENDER_DELETE_LO, 
SENDER_DELETE_HI);
+      assertObjectColumnMatchesExpectedPayload(
+          receiver, buildExpectedTimestampsAfterSenderDelete(1L, 
SOURCE_SINK_INSERT_ROWS));
+    }
+  }
+
+  private void verifyHistoricalObjectExportWithoutDeletes(
+      final String pipeName,
+      final TableSourceScope scope,
+      final boolean historyEnable,
+      final boolean realtimeEnable)
+      throws Exception {
+    try (ITableSession sender = senderEnv.getTableSessionConnection()) {
+      createSenderObjectTable(sender);
+
+      if (realtimeEnable && !historyEnable) {
+        sender.executeNonQueryStatement(
+            String.format(
+                "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)",
+                pipeName,
+                buildObjectSourceClause(scope, false, historyEnable, 
realtimeEnable, true),
+                buildTsFileLocalSinkClause()));
+        Thread.sleep(2000);
+      }
+
+      insertObjectRowsWithContiguousTimestamps(sender, "t1", 1L, 
SOURCE_SINK_INSERT_ROWS);
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      if (!realtimeEnable || historyEnable) {
+        sender.executeNonQueryStatement(
+            String.format(
+                "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)",
+                pipeName,
+                buildObjectSourceClause(scope, false, historyEnable, 
realtimeEnable, true),
+                buildTsFileLocalSinkClause()));
+      }
+
+      waitForExportedTsFile(new File(targetDir), 120_000);
+      sender.executeNonQueryStatement("DROP PIPE IF EXISTS " + pipeName);
+      sender.executeNonQueryStatement("DROP TABLE IF EXISTS t1");
+    }
+
+    try (ITableSession receiver = receiverEnv.getTableSessionConnection()) {
+      createReceiverDatabaseForLoad(receiver);
+      loadExportedTsFiles(receiver, new File(targetDir));
+      TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+      Assert.assertEquals(SOURCE_SINK_INSERT_ROWS, 
queryTableRowCount(receiver, "t1"));
+      assertObjectColumnMatchesExpectedPayload(
+          receiver, buildContiguousTimestamps(1L, SOURCE_SINK_INSERT_ROWS));
+    }
+  }
+
+  private static List<Long> buildContiguousTimestamps(
+      final long startInclusive, final int rowCount) {
+    final List<Long> out = new ArrayList<>(rowCount);
+    for (int i = 0; i < rowCount; i++) {
+      out.add(startInclusive + i);
+    }
+    return out;
+  }
+
+  private static List<Long> buildExpectedTimestampsAfterSenderDelete(
+      final long firstTimeInclusive, final int rowCount) {
+    final List<Long> out = new ArrayList<>();
+    final long end = firstTimeInclusive + rowCount - 1;
+    for (long t = firstTimeInclusive; t <= end; t++) {
+      if (t < SENDER_DELETE_LO || t > SENDER_DELETE_HI) {
+        out.add(t);
+      }
+    }
+    return out;
+  }
+
+  private static void assertNoRowsExistInDeletedTimeRange(
+      final ITableSession session, final long loInclusive, final long 
hiInclusive)
+      throws Exception {
+    useDatabase(session, TEST_DATABASE);
+    final String sql =
+        String.format(
+            "SELECT COUNT(*) FROM t1 WHERE time >= %d AND time <= %d", 
loInclusive, hiInclusive);
+    try (SessionDataSet ds = session.executeQueryStatement(sql)) {
+      final SessionDataSet.DataIterator it = ds.iterator();
+      Assert.assertTrue(it.next());
+      Assert.assertEquals(
+          "Rows in sender-deleted time window must not exist on receiver after 
LOAD",
+          0L,
+          it.getLong(1));
+    }
+  }
+
+  private static void assertObjectColumnMatchesExpectedPayload(
+      final ITableSession session, final List<Long> expectedTimesAsc) throws 
Exception {
+    useDatabase(session, TEST_DATABASE);
+    try (SessionDataSet ds =
+        session.executeQueryStatement("SELECT time, READ_OBJECT(file) FROM t1 
ORDER BY time ASC")) {
+      final SessionDataSet.DataIterator it = ds.iterator();
+      int idx = 0;
+      while (it.next()) {
+        Assert.assertTrue("More rows than expected", idx < 
expectedTimesAsc.size());
+        final long expectedTs = expectedTimesAsc.get(idx);
+        Assert.assertEquals("time column", expectedTs, it.getLong(1));
+        final byte[] actual = it.getBlob(2).getValues();
+        Assert.assertArrayEquals(
+            "OBJECT column (file) mismatch at time " + expectedTs,
+            buildExpectedObjectPayload(expectedTs),
+            actual);
+        idx++;
+      }
+      Assert.assertEquals(expectedTimesAsc.size(), idx);
+    }
+  }
+
+  private void createSenderObjectTable(final ITableSession sender) throws 
Exception {
+    sender.executeNonQueryStatement(
+        String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE));
+    useDatabase(sender, TEST_DATABASE);
+    sender.executeNonQueryStatement(
+        "CREATE TABLE IF NOT EXISTS t1 (id STRING TAG, file OBJECT FIELD)");
+  }
+
+  private void createReceiverDatabaseForLoad(final ITableSession receiver) 
throws Exception {
+    receiver.executeNonQueryStatement(
+        String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE));
+    useDatabase(receiver, TEST_DATABASE);
+  }
+
+  private static void useDatabase(final ITableSession session, final String 
database)
+      throws Exception {
+    session.executeNonQueryStatement(String.format("USE %s", database));
+  }
+
+  private String buildObjectSourceClause(
+      final TableSourceScope scope,
+      final boolean modsEnable,
+      final boolean historyEnable,
+      final boolean realtimeEnable,
+      final boolean captureTable) {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("'source.capture.table'='").append(captureTable).append("'");
+    switch (scope) {
+      case DATABASE_AND_TABLE:
+        sb.append(", 'source.database-name'='")
+            .append(TEST_DATABASE)
+            .append("', 'source.table-name'='t1'");
+        break;
+      case DATABASE_ONLY:
+        sb.append(", 
'source.database-name'='").append(TEST_DATABASE).append("'");
+        break;
+      case ALL:
+      default:
+        break;
+    }
+    sb.append(", 'source.inclusion'='data.insert");
+    if (modsEnable) {
+      sb.append(",data.delete");
+    }
+    sb.append("'");
+    if (modsEnable) {
+      sb.append(", 'source.mods.enable'='true'");
+    }
+    sb.append(", 'source.history.enable'='").append(historyEnable).append("'");
+    sb.append(", 
'source.realtime.enable'='").append(realtimeEnable).append("'");
+    return sb.toString();
+  }
+
+  private String buildTsFileLocalSinkClause() {
+    return String.format(
+        "'sink'='tsfile-local-sink', 'sink.local.target-path'='%s', "
+            + "'sink.batch.max-delay-seconds'='1', 
'sink.batch.size-bytes'='10485760'",
+        targetDir);
+  }
+
+  private static List<Long> buildExpectedTimestamps(
+      final long startTime, final long endTime, final long interval) {
+    final List<Long> times = new ArrayList<>();
+    for (long t = startTime; t <= endTime; t += interval) {
+      times.add(t);
+    }
+    return times;
+  }
+
+  private static void waitForExportedTsFile(final File root, final long 
timeoutMs)
+      throws Exception {
+    waitForStableExportedTsFiles(
+        root, timeoutMs, "Timeout waiting for exported .tsfile under " + 
root.getAbsolutePath());
+  }
+
+  private static void assertGeneratedObjectPayloadsByDevice(
+      final ITableSession session,
+      final String tableName,
+      final String deviceId,
+      final List<Long> expectedTimes)
+      throws Exception {
+    useDatabase(session, TEST_DATABASE);
+    final String query =
+        String.format(
+            "SELECT time, READ_OBJECT(sensor_obj) FROM %s WHERE id='%s' ORDER 
BY time ASC",
+            tableName, deviceId);
+
+    try (SessionDataSet dataSet = session.executeQueryStatement(query)) {
+      final SessionDataSet.DataIterator iterator = dataSet.iterator();
+      int count = 0;
+      while (iterator.next()) {
+        Assert.assertTrue("More rows than expected for " + deviceId, count < 
expectedTimes.size());
+        final long actualTime = iterator.getLong(1);
+        final Binary binary = iterator.getBlob(2);
+        final byte[] actualBytes = binary.getValues();
+        final long expectedTime = expectedTimes.get(count);
+        Assert.assertEquals("Time mismatch at index " + count, expectedTime, 
actualTime);
+        final byte[] expectedBytes =
+            String.format("AutoGenerated|Table=%s|ID=%s|Time=%d", tableName, 
deviceId, expectedTime)
+                .getBytes(StandardCharsets.UTF_8);
+        Assert.assertArrayEquals(
+            "Object byte content mismatch at time " + actualTime, 
expectedBytes, actualBytes);
+        count++;
+      }
+      Assert.assertEquals("Total row count mismatch for " + deviceId, 
expectedTimes.size(), count);
+    }
+  }
+
+  private void insertObjectRowsWithContiguousTimestamps(
+      final ITableSession session, final String tableName, final long startTs, 
final int rowCount)
+      throws Exception {
+    final long endTs = startTs + rowCount - 1;
+    insertObjectRowsInTimeRange(session, tableName, startTs, endTs);
+  }
+
+  private void insertObjectRowsInTimeRange(
+      final ITableSession session, final String tableName, final long startTs, 
final long endTs)
+      throws Exception {
+    List<String> columnNames = Arrays.asList("id", "file");
+    List<TSDataType> dataTypes = Arrays.asList(TSDataType.STRING, 
TSDataType.OBJECT);
+    List<ColumnCategory> columnCategories = Arrays.asList(ColumnCategory.TAG, 
ColumnCategory.FIELD);
+
+    Tablet tablet = new Tablet(tableName, columnNames, dataTypes, 
columnCategories, 100);
+
+    for (long ts = startTs; ts <= endTs; ts++) {
+      int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, ts);
+      tablet.addValue(rowIndex, 0, "device1");
+
+      byte[] dynamicObjectBytes = buildExpectedObjectPayload(ts);
+      tablet.addValue(rowIndex, 1, true, 0, dynamicObjectBytes);
+
+      if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
+        session.insert(tablet);
+        tablet.reset();
+      }
+    }
+
+    if (tablet.getRowSize() > 0) {
+      session.insert(tablet);
+    }
+  }
+
+  private static void waitForExportedTsFilesWithMods(final File root, final 
long timeoutMs)
+      throws Exception {
+    waitForStableExportedTsFiles(
+        root, timeoutMs, "Timeout waiting for exported .tsfile under " + 
root.getAbsolutePath());
+  }
+
+  private static void waitForStableExportedTsFiles(
+      final File root, final long timeoutMs, final String timeoutMessage) 
throws Exception {
+    final long deadline = System.currentTimeMillis() + timeoutMs;
+    ExportedTsFileSnapshot previousReadySnapshot = null;
+    while (System.currentTimeMillis() < deadline) {
+      final ExportedTsFileSnapshot currentSnapshot = 
ExportedTsFileSnapshot.capture(root);
+      if (currentSnapshot.isReady()) {
+        if (currentSnapshot.equals(previousReadySnapshot)) {
+          return;
+        }
+        previousReadySnapshot = currentSnapshot;
+      } else {
+        previousReadySnapshot = null;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.fail(timeoutMessage);
+  }
+
+  private static final class ExportedTsFileSnapshot {
+
+    private final List<String> fingerprints;
+    private final boolean ready;
+
+    private ExportedTsFileSnapshot(final List<String> fingerprints, final 
boolean ready) {
+      this.fingerprints = fingerprints;
+      this.ready = ready;
+    }
+
+    private static ExportedTsFileSnapshot capture(final File root) {
+      final List<File> tsfiles = new ArrayList<>();
+      collectTsFilesRecursively(root, tsfiles);
+      if (tsfiles.isEmpty()) {
+        return new ExportedTsFileSnapshot(new ArrayList<>(), false);
+      }
+
+      tsfiles.sort(Comparator.comparing(File::getAbsolutePath));
+      final List<String> fingerprints = new ArrayList<>();
+      for (final File tsFile : tsfiles) {
+        fingerprints.add(buildFileFingerprint(tsFile));
+      }
+      return new ExportedTsFileSnapshot(fingerprints, true);
+    }
+
+    private boolean isReady() {
+      return ready;
+    }
+
+    private static String buildFileFingerprint(final File file) {
+      return file.getAbsolutePath() + ":" + file.length() + ":" + 
file.lastModified();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof ExportedTsFileSnapshot)) {
+        return false;
+      }
+      final ExportedTsFileSnapshot that = (ExportedTsFileSnapshot) obj;
+      return ready == that.ready && fingerprints.equals(that.fingerprints);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = fingerprints.hashCode();
+      result = 31 * result + Boolean.hashCode(ready);
+      return result;
+    }
+  }
+
+  private static void loadExportedTsFiles(final ITableSession session, final 
File root)
+      throws Exception {
+    useDatabase(session, TEST_DATABASE);
+    final List<File> tsfiles = new ArrayList<>();
+    collectTsFilesRecursively(root, tsfiles);
+    Assert.assertFalse(tsfiles.isEmpty());
+    tsfiles.sort(Comparator.comparing(File::getAbsolutePath));
+    for (File f : tsfiles) {
+      session.executeNonQueryStatement(
+          String.format(
+              "LOAD '%s' WITH ('database-name'='%s')", f.getAbsolutePath(), 
TEST_DATABASE));
+    }
+  }
+
+  private static long queryTableRowCount(final ITableSession session, final 
String table)
+      throws Exception {
+    useDatabase(session, TEST_DATABASE);
+    try (SessionDataSet ds = session.executeQueryStatement("SELECT COUNT(*) 
FROM " + table)) {
+      final SessionDataSet.DataIterator it = ds.iterator();
+      Assert.assertTrue(it.next());
+      return it.getLong(1);
+    }
+  }
+
+  private static void collectTsFilesRecursively(File dir, List<File> tsfiles) {
+    File[] files = dir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File f : files) {
+      if (f.isDirectory()) {
+        collectTsFilesRecursively(f, tsfiles);
+      } else if (f.getName().endsWith(".tsfile")) {
+        tsfiles.add(f);
+      }
+    }
+  }
+
+  private static void deleteRecursivelyQuietly(Path dirPath) {
+    if (!Files.exists(dirPath)) {
+      return;
+    }
+    try {
+      Files.walk(dirPath)
+          .sorted(Comparator.reverseOrder())
+          .forEach(
+              path -> {
+                try {
+                  Files.deleteIfExists(path);
+                } catch (IOException e) {
+                  LOGGER.warn("Failed to delete path {}", path, e);
+                }
+              });
+    } catch (IOException e) {
+      LOGGER.warn("Failed to cleanup temp directory {}", dirPath, e);
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
index 0fa98a8a9c4..471f242f75c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
 import org.apache.iotdb.db.it.utils.StandardObjectTableModelTsFileGenerator;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.db.utils.ObjectTypeUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.isession.ITableSession;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -66,6 +67,7 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
   private static final String OBJECT_TABLE_NAME = "factory_metrics";
 
   private static final int OBJECT_MULTI_WEEK_DEVICE_COUNT = 5;
+
   private static final long HOUR_MS = 3600 * 1000L;
   private static final long DAY_MS = 24L * HOUR_MS;
   private static final long WEEK_MS = 7L * DAY_MS;
@@ -118,21 +120,21 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
       session.executeNonQueryStatement(
           "CREATE TABLE IF NOT EXISTS t1 (id STRING TAG, file OBJECT FIELD)");
 
-      insertObjectData(session, "t1", 1, 250);
+      insertObjectData(session, "t1", 1, 900);
       TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
       session.executeNonQueryStatement(
           String.format(
               "CREATE PIPE p1 "
-                  + "WITH EXTRACTOR ("
-                  + "'extractor.capture.table'='true', "
-                  + "'extractor.database-name'='db1', "
-                  + "'extractor.table-name'='t1', "
-                  + "'extractor.inclusion'='data.insert', "
-                  + "'extractor.history.enable'='true', "
-                  + "'extractor.realtime.enable'='true' "
+                  + "WITH SOURCE ("
+                  + "'source.capture.table'='true', "
+                  + "'source.database-name'='db1', "
+                  + "'source.table-name'='t1', "
+                  + "'source.inclusion'='data.insert', "
+                  + "'source.history.enable'='true', "
+                  + "'source.realtime.enable'='true' "
                   + ") "
-                  + "WITH CONNECTOR ("
+                  + "WITH SINK ("
                   + "'sink'='tsfile-local-sink', "
                   + "'sink.local.target-path'='%s', "
                   + "'sink.batch.max-delay-seconds'='1', "
@@ -140,12 +142,12 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
                   + ")",
               targetDir));
 
-      waitForAndVerifyExportedObjects(250, 1, 250, 0, 251, 500);
+      waitForAndVerifyExportedObjects(900, 1, 900, 0, 901, 2400);
 
-      insertObjectData(session, "t1", 251, 500);
+      insertObjectData(session, "t1", 901, 2400);
       TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
-      waitForAndVerifyExportedObjects(250, 1, 250, 250, 251, 500);
+      waitForAndVerifyExportedObjects(900, 1, 900, 1500, 901, 2400);
 
       session.executeNonQueryStatement("DROP PIPE p1");
     }
@@ -191,7 +193,7 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
                   + "'source.history.enable'='true', "
                   + "'source.realtime.enable'='true' "
                   + ") "
-                  + "WITH CONNECTOR ("
+                  + "WITH SINK ("
                   + "'sink'='tsfile-local-sink', "
                   + "'sink.local.target-path'='%s', "
                   + "'sink.batch.max-delay-seconds'='1', "
@@ -396,6 +398,55 @@ public class IoTDBPipeTsFileSinkObjectIT extends 
AbstractPipeTableModelDualManua
         "Realtime Object count mismatch after Pipe sync", 
expectedRealtimeCount, realtimeFound);
   }
 
+  private static void waitForExportedTsFilesWithMods(final File root, final 
long timeoutMs)
+      throws Exception {
+    final long deadline = System.currentTimeMillis() + timeoutMs;
+    while (System.currentTimeMillis() < deadline) {
+      final List<File> tsfiles = new ArrayList<>();
+      findTsFiles(root, tsfiles);
+      if (!tsfiles.isEmpty()) {
+        boolean allMods = true;
+        for (File tf : tsfiles) {
+          final File mod = new File(tf.getParent(), tf.getName() + 
ModificationFile.FILE_SUFFIX);
+          if (!mod.isFile()) {
+            allMods = false;
+            break;
+          }
+        }
+        if (allMods) {
+          Thread.sleep(2000);
+          return;
+        }
+      }
+      Thread.sleep(800);
+    }
+    Assert.fail(
+        "Timeout waiting for .tsfile and companion "
+            + ModificationFile.FILE_SUFFIX
+            + " under "
+            + root.getAbsolutePath());
+  }
+
+  private static void loadAllTsFilesUnderDir(final ITableSession session, 
final File root)
+      throws Exception {
+    final List<File> tsfiles = new ArrayList<>();
+    findTsFiles(root, tsfiles);
+    Assert.assertFalse(tsfiles.isEmpty());
+    tsfiles.sort(Comparator.comparing(File::getAbsolutePath));
+    for (File f : tsfiles) {
+      session.executeNonQueryStatement(String.format("LOAD '%s'", 
f.getAbsolutePath()));
+    }
+  }
+
+  private static long queryRowCount(final ITableSession session, final String 
table)
+      throws Exception {
+    try (SessionDataSet ds = session.executeQueryStatement("SELECT COUNT(*) 
FROM " + table)) {
+      final SessionDataSet.DataIterator it = ds.iterator();
+      Assert.assertTrue(it.next());
+      return it.getLong(1);
+    }
+  }
+
   private static void findTsFiles(File dir, List<File> tsfiles) {
     File[] files = dir.listFiles();
     if (files == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 803269377a3..260b3716d3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -368,22 +369,44 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     extractTime = System.nanoTime();
+    final List<ModEntry> linkedObjectColumnModEntries = new ArrayList<>();
     try {
       if (Objects.nonNull(pipeName)) {
-        final Iterator<String> pathIterator = objectPathIterator();
+        // In the real tsfile transfer path, the tsfile event is expected to 
observe a sealed
+        // resource here. The UNCLOSED case is mainly introduced by unit tests 
that manually mock
+        // an unsealed resource to verify realtime behavior before close.
+        final boolean shouldLinkObjectFiles =
+            !Objects.equals(hasObjectData, Boolean.FALSE) && 
resource.isClosed();
+        final Iterator<String> pathIterator =
+            shouldLinkObjectFiles
+                ? new TsFileObjectPathIterator(this, 
linkedObjectColumnModEntries)
+                : Collections.emptyIterator();
         final int linked =
             PipeDataNodeResourceManager.object().linkObjectFiles(resource, 
pathIterator, pipeName);
-        hasObjectData = linked > 0;
         if (linked > 0) {
+          hasObjectData = true;
           PipeDataNodeResourceManager.object().increaseReference(resource, 
pipeName);
+        } else if (shouldLinkObjectFiles) {
+          hasObjectData = false;
         }
 
-        PipeDataNodeResourceManager.object().setTsFileClosed(resource, 
pipeName);
+        if (resource.isClosed()) {
+          PipeDataNodeResourceManager.object().setTsFileClosed(resource, 
pipeName);
+        }
       }
       tsFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, 
pipeName);
       if (isWithMod) {
-        modFile =
-            
PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, 
pipeName);
+        if (modFile != null) {
+          modFile =
+              
PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, 
pipeName);
+        }
+      } else {
+        if (pipeName != null && !linkedObjectColumnModEntries.isEmpty()) {
+          modFile =
+              PipeDataNodeResourceManager.tsfile()
+                  .writeModEntriesForPipeTsFile(linkedObjectColumnModEntries, 
tsFile, pipeName);
+          isWithMod = true;
+        }
       }
       return true;
     } catch (final Exception e) {
@@ -408,7 +431,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
         PipeDataNodeResourceManager.object().decreaseReference(resource, 
pipeName);
       }
       PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, 
pipeName);
-      if (isWithMod) {
+      if (isWithMod && modFile != null) {
         PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, 
pipeName);
       }
       close();
@@ -869,7 +892,11 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
    */
   private TsFileInsertionEventParser initEventParser(final boolean 
objectPathsOnly) {
     try {
-      eventParser.compareAndSet(null, 
createParserProvider().provide(isWithMod, objectPathsOnly));
+      final boolean collectObjectColumnModEntries = objectPathsOnly && 
!isWithMod;
+      eventParser.compareAndSet(
+          null,
+          createParserProvider()
+              .provide(isWithMod, objectPathsOnly, 
collectObjectColumnModEntries));
       return eventParser.get();
     } catch (final Exception e) {
       close();
@@ -924,13 +951,33 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   /** Release the resource of {@link TsFileInsertionEventParser}. */
   @Override
   public void close() {
-    eventParser.getAndUpdate(
-        parser -> {
-          if (Objects.nonNull(parser)) {
-            parser.close();
-          }
-          return null;
-        });
+    closeParser();
+  }
+
+  public void closeParser() {
+    closeParserInternal(null);
+  }
+
+  public List<ModEntry> drainGeneratedObjectColumnModEntriesAndCloseParser() {
+    final List<ModEntry> modEntries = new ArrayList<>();
+    closeParserInternal(modEntries);
+    return modEntries;
+  }
+
+  public void drainGeneratedObjectColumnModEntriesAndCloseParserTo(
+      final List<ModEntry> modEntries) {
+    closeParserInternal(modEntries);
+  }
+
+  private void closeParserInternal(final List<ModEntry> modEntries) {
+    final TsFileInsertionEventParser parser = eventParser.getAndSet(null);
+    if (Objects.isNull(parser)) {
+      return;
+    }
+    if (modEntries != null) {
+      parser.drainGeneratedObjectColumnModEntriesTo(modEntries);
+    }
+    parser.close();
   }
 
   /////////////////////////// Object ///////////////////////////
@@ -975,7 +1022,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
         this.eventParser);
   }
 
-  private static class PipeTsFileInsertionEventResource extends 
PipeEventResource {
+  private class PipeTsFileInsertionEventResource extends PipeEventResource {
 
     private final File tsFile;
     private final boolean isWithMod;
@@ -1015,17 +1062,11 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
           PipeDataNodeResourceManager.object().decreaseReference(resource, 
pipeName);
         }
         PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, 
pipeName);
-        if (isWithMod) {
+        if (isWithMod && modFile != null) {
           PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, 
pipeName);
         }
 
-        eventParser.getAndUpdate(
-            parser -> {
-              if (Objects.nonNull(parser)) {
-                parser.close();
-              }
-              return null;
-            });
+        PipeTsFileInsertionEvent.this.close();
       } catch (final Exception e) {
         LOGGER.warn("Decrease reference count for TsFile {} error.", 
tsFile.getPath(), e);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java
index 139641c0a04..c7a78243149 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java
@@ -20,15 +20,18 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile;
 
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
@@ -38,6 +41,7 @@ public final class TsFileObjectPathIterator implements 
Iterator<String> {
 
   private final Iterator<TabletInsertionEvent> tabletEventIterator;
   private final PipeTsFileInsertionEvent tsfileEvent;
+  @Nullable private final List<ModEntry> modEntriesSink;
 
   private Iterator<String> currentPathIterator;
 
@@ -45,11 +49,18 @@ public final class TsFileObjectPathIterator implements 
Iterator<String> {
   private String nextPath = null;
   private boolean isClosed = false;
 
-  public TsFileObjectPathIterator(@Nonnull PipeTsFileInsertionEvent 
tsFileEvent) {
+  public TsFileObjectPathIterator(@Nonnull final PipeTsFileInsertionEvent 
tsFileEvent) {
+    this(tsFileEvent, null);
+  }
+
+  TsFileObjectPathIterator(
+      @Nonnull final PipeTsFileInsertionEvent tsFileEvent,
+      @Nullable final List<ModEntry> modEntriesSink) {
     Objects.requireNonNull(tsFileEvent, "tsFileEvent cannot be null");
 
     this.tabletEventIterator = 
tsFileEvent.toTabletInsertionEvents(true).iterator();
     this.tsfileEvent = tsFileEvent;
+    this.modEntriesSink = modEntriesSink;
 
     this.currentPathIterator = Collections.emptyIterator();
   }
@@ -139,7 +150,11 @@ public final class TsFileObjectPathIterator implements 
Iterator<String> {
       hasCachedNext = false;
       currentPathIterator = Collections.emptyIterator();
       if (tsfileEvent != null) {
-        tsfileEvent.close();
+        if (modEntriesSink != null) {
+          
tsfileEvent.drainGeneratedObjectColumnModEntriesAndCloseParserTo(modEntriesSink);
+        } else {
+          tsfileEvent.closeParser();
+        }
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index f64e3d9dd21..117513e726c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 public abstract class TsFileInsertionEventParser implements AutoCloseable {
 
@@ -155,6 +156,8 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
    */
   public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
 
+  public void drainGeneratedObjectColumnModEntriesTo(final List<ModEntry> 
modEntries) {}
+
   /**
    * Record parse start time when hasNext() is called for the first time and 
returns true. Should be
    * called in Iterator.hasNext() when it's the first call.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index 8a09c744932..83b305f0d29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -87,7 +87,7 @@ public class TsFileInsertionEventParserProvider {
    */
   public TsFileInsertionEventParser provide(final boolean isWithMod)
       throws IOException, IllegalPathException {
-    return this.provide(isWithMod, false);
+    return this.provide(isWithMod, false, false);
   }
 
   /**
@@ -101,6 +101,14 @@ public class TsFileInsertionEventParserProvider {
    */
   public TsFileInsertionEventParser provide(final boolean isWithMod, final 
boolean objectPathsOnly)
       throws IOException, IllegalPathException {
+    return provide(isWithMod, objectPathsOnly, false);
+  }
+
+  public TsFileInsertionEventParser provide(
+      final boolean isWithMod,
+      final boolean objectPathsOnly,
+      final boolean collectObjectColumnModEntries)
+      throws IOException, IllegalPathException {
     if (pipeName != null) {
       PipeTsFileToTabletsMetrics.getInstance()
           .markTsFileToTabletInvocation(pipeName + "_" + creationTime);
@@ -118,7 +126,8 @@ public class TsFileInsertionEventParserProvider {
           entity,
           sourceEvent,
           isWithMod,
-          objectPathsOnly);
+          objectPathsOnly,
+          collectObjectColumnModEntries);
     }
 
     // Use scan container to save memory
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 362af4d3fbe..cd21ab297f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -33,6 +33,9 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUti
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -42,12 +45,25 @@ import org.apache.tsfile.write.record.Tablet;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Set;
 
 public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser {
 
+  private final boolean collectObjectColumnModEntries;
+
+  private final List<ModEntry> originalModEntries;
+
+  private final Map<String, Set<String>> tableObjectMeasurements = new 
HashMap<>();
+
   private final long startTime;
   private final long endTime;
   private final TablePattern tablePattern;
@@ -69,7 +85,8 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
       final IAuditEntity entity,
       final PipeInsertionEvent sourceEvent,
       final boolean isWithMod,
-      final boolean objectPathsOnly)
+      final boolean objectPathsOnly,
+      final boolean collectObjectColumnModEntries)
       throws IOException {
     super(
         tsFile,
@@ -87,12 +104,22 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
         objectPathsOnly,
         isWithMod);
 
+    this.collectObjectColumnModEntries = collectObjectColumnModEntries;
     this.isWithMod = isWithMod;
     try {
-      currentModifications =
-          isWithMod
-              ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
-              : PatternTreeMapFactory.getModsPatternTreeMap();
+      final boolean loadModificationsFromTsFile =
+          this.isWithMod || this.collectObjectColumnModEntries;
+      if (this.collectObjectColumnModEntries) {
+        originalModEntries = 
ModsOperationUtil.readAllModificationsFromTsFile(tsFile);
+        currentModifications =
+            
ModsOperationUtil.buildModificationsPatternTreeMap(originalModEntries);
+      } else {
+        originalModEntries = Collections.emptyList();
+        currentModifications =
+            loadModificationsFromTsFile
+                ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+                : PatternTreeMapFactory.getModsPatternTreeMap();
+      }
       allocatedMemoryBlockForModifications =
           PipeDataNodeResourceManager.memory()
               
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
@@ -150,7 +177,60 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
         entity,
         sourceEvent,
         isWithMod,
-        objectPathsOnly);
+        objectPathsOnly,
+        false);
+  }
+
+  private void recordTableObjectMeasurements(
+      final String tableName, final List<String> objectMeasurements) {
+    if (tableName == null
+        || tableName.isEmpty()
+        || objectMeasurements == null
+        || objectMeasurements.isEmpty()) {
+      return;
+    }
+    tableObjectMeasurements
+        .computeIfAbsent(tableName, ignored -> new LinkedHashSet<>())
+        .addAll(objectMeasurements);
+  }
+
+  private List<ModEntry> generateObjectColumnModEntries(List<ModEntry> 
generatedEntries) {
+    if (!collectObjectColumnModEntries
+        || originalModEntries.isEmpty()
+        || tableObjectMeasurements.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    for (final ModEntry modEntry : originalModEntries) {
+      if (!(modEntry instanceof TableDeletionEntry)) {
+        continue;
+      }
+
+      final TableDeletionEntry tableDeletionEntry = (TableDeletionEntry) 
modEntry;
+      final Set<String> objectMeasurements =
+          tableObjectMeasurements.get(tableDeletionEntry.getTableName());
+      if (objectMeasurements == null || objectMeasurements.isEmpty()) {
+        continue;
+      }
+
+      ModEntry entry =
+          ModsOperationUtil.buildObjectColumnDeletionEntries(
+              tableDeletionEntry, objectMeasurements);
+      if (entry != null) {
+        generatedEntries.add(entry);
+      }
+    }
+
+    return generatedEntries.isEmpty()
+        ? Collections.emptyList()
+        : ModificationUtils.sortAndMerge(generatedEntries);
+  }
+
+  @Override
+  public void drainGeneratedObjectColumnModEntriesTo(final List<ModEntry> 
modEntries) {
+    if (modEntries != null) {
+      modEntries.addAll(generateObjectColumnModEntries(new ArrayList<>()));
+    }
   }
 
   @Override
@@ -181,7 +261,12 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
                               currentModifications,
                               startTime,
                               endTime,
-                              objectPathsOnly);
+                              objectPathsOnly,
+                              collectObjectColumnModEntries,
+                              collectObjectColumnModEntries && objectPathsOnly
+                                  ? TsFileInsertionEventTableParser.this
+                                      ::recordTableObjectMeasurements
+                                  : null);
                     }
                     final boolean hasNext = tabletIterator.hasNext();
                     if (hasNext && !parseStartTimeRecorded) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index c458f85851c..97c4ca74414 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -59,6 +59,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -82,6 +83,10 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
 
   private final boolean objectPathsOnly;
 
+  private final boolean collectObjectColumnModEntries;
+
+  private final BiConsumer<String, List<String>> tableObjectMeasurementsSink;
+
   // mods entry
   private final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications;
 
@@ -104,6 +109,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
   private List<ColumnCategory> columnTypes;
   private List<String> measurementList;
   private List<TSDataType> dataTypeList;
+  private List<String> objectMeasurementList;
   private int deviceIdSize;
 
   private List<ModsOperationUtil.ModsInfo> modsInfoList;
@@ -124,12 +130,16 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
       final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications,
       final long startTime,
       final long endTime,
-      final boolean objectPathsOnly)
+      final boolean objectPathsOnly,
+      final boolean collectObjectColumnModEntries,
+      final BiConsumer<String, List<String>> tableObjectMeasurementsSink)
       throws IOException {
 
     this.startTime = startTime;
     this.endTime = endTime;
     this.modifications = modifications;
+    this.collectObjectColumnModEntries = collectObjectColumnModEntries;
+    this.tableObjectMeasurementsSink = tableObjectMeasurementsSink;
 
     this.reader = tsFileSequenceReader;
     this.metadataQuerier = new MetadataQuerierByFileImpl(reader);
@@ -278,6 +288,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
               dataTypeList = new ArrayList<>();
               columnTypes = new ArrayList<>();
               measurementList = new ArrayList<>();
+              objectMeasurementList = new ArrayList<>();
 
               for (int i = 0; i < columnSchemaSize; i++) {
                 final IMeasurementSchema schema = 
tableSchema.getColumnSchemas().get(i);
@@ -290,9 +301,14 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
                     columnTypes.add(ColumnCategory.TAG);
                     measurementList.add(measurementName);
                     dataTypeList.add(schema.getType());
+                  } else if (schema.getType() == TSDataType.OBJECT) {
+                    objectMeasurementList.add(measurementName);
                   }
                 }
               }
+              if (collectObjectColumnModEntries && tableObjectMeasurementsSink 
!= null) {
+                tableObjectMeasurementsSink.accept(tableName, 
objectMeasurementList);
+              }
               deviceIdSize = dataTypeList.size();
               state = State.INIT_CHUNK_METADATA;
               break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
index 66fed43fead..b8ac32db1bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
@@ -20,19 +20,23 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
 
 import org.apache.iotdb.commons.path.PatternTreeMap;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -53,20 +57,58 @@ public class ModsOperationUtil {
    */
   public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
       loadModificationsFromTsFile(File tsFile) {
-    PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications =
-        PatternTreeMapFactory.getModsPatternTreeMap();
+    return 
buildModificationsPatternTreeMap(readAllModificationsFromTsFile(tsFile));
+  }
 
+  public static List<ModEntry> readAllModificationsFromTsFile(final File 
tsFile) {
     try {
-      ModificationFile.readAllModifications(tsFile, true)
-          .forEach(
-              modification -> 
modifications.append(modification.keyOfPatternTree(), modification));
+      return ModificationFile.readAllModifications(tsFile, true);
     } catch (Exception e) {
       throw new PipeException("Failed to load modifications from TsFile: " + 
tsFile.getPath(), e);
     }
+  }
 
+  public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+      buildModificationsPatternTreeMap(final List<ModEntry> modEntries) {
+    final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications =
+        PatternTreeMapFactory.getModsPatternTreeMap();
+    for (final ModEntry modEntry : modEntries) {
+      modifications.append(modEntry.keyOfPatternTree(), modEntry);
+    }
     return modifications;
   }
 
+  public static ModEntry buildObjectColumnDeletionEntries(
+      final TableDeletionEntry tableDeletionEntry, final Set<String> 
objectMeasurementNames) {
+    if (tableDeletionEntry == null
+        || objectMeasurementNames == null
+        || objectMeasurementNames.isEmpty()) {
+      return null;
+    }
+
+    final DeletionPredicate predicate = tableDeletionEntry.getPredicate();
+    final List<String> targetMeasurements;
+    if (predicate.getMeasurementNames().isEmpty()) {
+      targetMeasurements = new ArrayList<>(objectMeasurementNames);
+    } else {
+      targetMeasurements =
+          predicate.getMeasurementNames().stream()
+              .filter(objectMeasurementNames::contains)
+              .collect(Collectors.toList());
+    }
+
+    if (targetMeasurements.isEmpty()) {
+      return null;
+    }
+
+    return new TableDeletionEntry(
+        new DeletionPredicate(
+            predicate.getTableName(), predicate.getIdPredicate(), 
targetMeasurements),
+        new TimeRange(
+            tableDeletionEntry.getTimeRange().getMin(),
+            tableDeletionEntry.getTimeRange().getMax()));
+  }
+
   /**
    * Check if data in the specified time range is completely deleted by mods 
Different logic for
    * tree model and table model
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index c84504fa52b..644beb1ac40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -35,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -134,6 +137,50 @@ public class PipeTsFileResourceManager {
     return resultFile;
   }
 
+  /**
+   * Resolves the pipe-directory TsFile path for {@code tsFile}, derives the 
companion mods file
+   * ({@code tsFileName + ModificationFileV1.FILE_SUFFIX}, i.e. {@code 
.mods}), creates parent
+   * directories if needed, writes {@code entries} with {@link 
ModificationFile}, and returns that
+   * mods file.
+   *
+   * @param entries modification entries to append (may be empty)
+   * @param tsFile original or pipe-hardlinked TsFile; used only to compute 
the mods path under pipe
+   *     dir
+   * @param pipeName pipe name for {@link 
#getHardlinkOrCopiedFileInPipeDir(File, String)}
+   * @return the written mods file under the pipe TsFile directory
+   * @throws IOException if the directory cannot be created or write fails
+   */
+  public File writeModEntriesForPipeTsFile(
+      final List<ModEntry> entries, final File tsFile, final @Nullable String 
pipeName)
+      throws IOException {
+    final List<ModEntry> toWrite = entries != null ? entries : 
Collections.emptyList();
+    final File pipeTsFile = getHardlinkOrCopiedFileInPipeDir(tsFile, pipeName);
+    final File parent = pipeTsFile.getParentFile();
+    if (parent != null && !parent.exists() && !parent.mkdirs()) {
+      throw new IOException("Failed to create directory: " + parent.getPath());
+    }
+    final File modsFile;
+
+    try (ModificationFile modificationFile =
+        new ModificationFile(ModificationFile.getExclusiveMods(pipeTsFile), 
false)) {
+      modificationFile.write(toWrite);
+      modsFile = modificationFile.getFile();
+    }
+
+    segmentLock.lock(modsFile);
+    try {
+      if (Objects.nonNull(pipeName)) {
+        hardlinkOrCopiedFileToPipeTsFileResourceMap
+            .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+            .put(modsFile.getPath(), new PipeTsFileResource(modsFile));
+      }
+    } finally {
+      segmentLock.unlock(modsFile);
+    }
+    increasePublicReference(modsFile, pipeName, false);
+    return modsFile;
+  }
+
   private boolean increaseReferenceIfExists(
       final File file, final @Nullable String pipeName, final boolean 
isTsFile) throws IOException {
     segmentLock.lock(file);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java
index ab264f17b16..aba24a513cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java
@@ -23,34 +23,31 @@ import java.io.File;
 import java.io.IOException;
 
 /**
- * FileTransfer - File transfer interface
+ * Abstraction for transferring exported TsFiles and their companion artifacts 
to a sink target.
  *
- * <p>Abstractions for different file transfer methods (e.g. local copy, SCP). 
Implementations must:
- *
- * <ul>
- *   <li>{@link #handshake()} - Test connection / reachability of the transfer 
target
- *   <li>{@link #transferFile(File, File, String)} - Transfer a single file 
(e.g. TSFile) to the
- *       target directory with the given file name
- * </ul>
+ * <p>Implementations may write to a local directory, a remote host, or any 
other file-oriented
+ * destination, but they must preserve the exported TsFile name and keep 
companion files aligned
+ * with it.
  */
 public interface FileTransfer extends AutoCloseable {
 
   /**
-   * Handshake / test connection. Verifies that the transfer target is 
reachable (e.g. local dir
-   * exists, remote host is reachable).
+   * Verifies that the transfer target is reachable and ready to accept files.
    *
-   * @throws IOException when connection test fails
+   * @throws IOException when the target cannot be prepared
    */
   void handshake() throws IOException;
 
   /**
-   * Transfer a single file (e.g. TSFile) to the target.
+   * Transfers one exported TsFile together with its optional companion files.
    *
-   * @param tsfile local source file (e.g. TSFile)
-   * @param objectPath target parent directory as File; if null, use 
implementation's default base
-   *     path
-   * @param targetFileName target file name at destination
-   * @throws IOException when transfer fails
+   * @param tsfile local TsFile to transfer
+   * @param modFile local companion mod file; may be null when no mod file 
exists
+   * @param objectPath local directory containing exported Object files; may 
be null when the TsFile
+   *     has no Object data
+   * @param targetFileName target file name used for the transferred TsFile
+   * @throws IOException when any part of the transfer fails
    */
-  void transferFile(File tsfile, File objectPath, String targetFileName) 
throws IOException;
+  void transferFile(File tsfile, File modFile, File objectPath, String 
targetFileName)
+      throws IOException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java
index 46610a1d9e5..a703dd9c432 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java
@@ -1,8 +1,11 @@
 package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
 
+import org.apache.iotdb.commons.utils.FileUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.common.constant.TsFileConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +42,7 @@ public class LocalFileTransfer implements FileTransfer {
   }
 
   @Override
-  public void transferFile(File tsFile, File objectSourceDir, String 
targetName)
+  public void transferFile(File tsFile, File modFile, File objectSourceDir, 
String targetName)
       throws IOException {
     if (tsFile == null || !tsFile.exists()) {
       throw new PipeException("Source TsFile is missing");
@@ -49,7 +52,9 @@ public class LocalFileTransfer implements FileTransfer {
       copyObjectDirectory(objectSourceDir, targetName);
     }
 
-    copyTsFile(tsFile, targetName);
+    final String finalTsName = finalTsFileName(targetName);
+    exportModIfPresent(modFile, finalTsName);
+    exportTsFile(tsFile, finalTsName);
   }
 
   private void copyObjectDirectory(File sourceDir, String dirName) throws 
IOException {
@@ -82,16 +87,75 @@ public class LocalFileTransfer implements FileTransfer {
     }
   }
 
-  private void copyTsFile(File tsFile, String targetName) throws IOException {
-    String finalTsName = targetName.endsWith(".tsfile") ? targetName : 
targetName + ".tsfile";
-    File dest = new File(targetBaseDir, finalTsName);
+  private static String finalTsFileName(final String targetName) {
+    return targetName.endsWith(TsFileConstant.TSFILE_SUFFIX)
+        ? targetName
+        : targetName + TsFileConstant.TSFILE_SUFFIX;
+  }
+
+  private void exportTsFile(final File tsFile, final String finalTsName) 
throws IOException {
+    final File dest = new File(targetBaseDir, finalTsName);
+    transferByHardLinkOrCopy(tsFile, dest, "TsFile");
+  }
+
+  private void exportModIfPresent(final File modFile, final String 
finalTsName) throws IOException {
+    if (modFile == null || !modFile.exists()) {
+      return;
+    }
+    final String modDestName = finalTsName + ModificationFile.FILE_SUFFIX;
+    final File modDest = new File(targetBaseDir, modDestName);
+    transferByHardLinkOrCopy(modFile, modDest, "TsFile mod");
+  }
+
+  void transferByHardLinkOrCopy(final File source, final File dest, final 
String fileType)
+      throws IOException {
+    if (source.getCanonicalFile().equals(dest.getCanonicalFile())) {
+      LOGGER.info(
+          "Skip exporting {} because source and target are the same file: {}",
+          fileType,
+          dest.getAbsolutePath());
+      return;
+    }
+
+    final Path targetDirectory = dest.getParentFile().toPath();
+    if (!Files.exists(targetDirectory)) {
+      Files.createDirectories(targetDirectory);
+    }
+
+    if (isSameFileStore(source.toPath(), targetDirectory)) {
+      try {
+        createHardLink(source, dest);
+        LOGGER.info("Successfully hard-linked {} to {}", fileType, 
dest.getAbsolutePath());
+        return;
+      } catch (final IOException e) {
+        LOGGER.warn(
+            "Failed to hard-link {} to {}, fallback to copy.", fileType, 
dest.getAbsolutePath(), e);
+      }
+    } else {
+      LOGGER.info(
+          "Detected cross-filesystem export for {}, fallback to copy: {}",
+          fileType,
+          dest.getAbsolutePath());
+    }
+
+    copyFile(source, dest);
+    LOGGER.info("Successfully copied {} to {}", fileType, 
dest.getAbsolutePath());
+  }
+
+  boolean isSameFileStore(final Path sourcePath, final Path targetDirectory) 
throws IOException {
+    return 
Files.getFileStore(sourcePath).equals(Files.getFileStore(targetDirectory));
+  }
+
+  void createHardLink(final File source, final File dest) throws IOException {
+    FileUtils.createHardLink(source, dest);
+  }
 
-    Files.copy(tsFile.toPath(), dest.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
-    LOGGER.info("Successfully transferred TsFile to {}", 
dest.getAbsolutePath());
+  private void copyFile(final File source, final File dest) throws IOException 
{
+    Files.copy(source.toPath(), dest.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
   }
 
   @Override
   public void close() {
-    // No external resource is maintained by local transfer.
+    // No-op because local transfer does not keep any open connection or file 
handle.
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
index 943d0efd147..bfb543cd978 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java
@@ -173,8 +173,13 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
 
       final File tsFile = event.getTsFile();
       if (tsFile != null && tsFile.exists()) {
+        final File modFile =
+            event.isWithMod() && event.getModFile() != null && 
event.getModFile().exists()
+                ? event.getModFile()
+                : null;
         fileTransfer.transferFile(
             tsFile,
+            modFile,
             PipeObjectPathUtil.resolveLinkedObjectDirectory(
                 event.getTsFileResource(), event.getPipeName()),
             TsFileNameGenerator.targetNameForEvent(event));
@@ -183,7 +188,7 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
       final File tsFile = tsFileInsertionEvent.getTsFile();
       if (tsFile != null && tsFile.exists()) {
         fileTransfer.transferFile(
-            tsFile, null, 
TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent));
+            tsFile, null, null, 
TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent));
       }
     }
   }
@@ -263,7 +268,7 @@ public class PipeTsFileLocalSink implements PipeSink, 
PipeBatchMetricsSettable {
       final File objectDir = tsFileAndObjectDir.getRight();
       if (tsFile != null && tsFile.exists()) {
         fileTransfer.transferFile(
-            tsFile, objectDir, 
TsFileNameGenerator.targetNameForGeneratedFile(tsFile));
+            tsFile, null, objectDir, 
TsFileNameGenerator.targetNameForGeneratedFile(tsFile));
       }
     }
     
eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileLocalSink.class.getName(),
 true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java
index a70416d5400..f8f268813a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java
@@ -19,11 +19,17 @@
 
 package org.apache.iotdb.db.storageengine.load.util;
 
+import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.load.ObjectFileCorruptedException;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil.ModsInfo;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.db.utils.ObjectTypeUtils;
 
 import org.apache.tsfile.common.conf.TSFileDescriptor;
@@ -54,6 +60,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.FileStore;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -89,6 +96,8 @@ public class LoadObjectFileUtil {
     final File targetDir = new File(objectTempBaseDir, targetDirName);
     final File searchRoot =
         node.getObjectFileSearchRoot() != null ? 
node.getObjectFileSearchRoot() : targetDir;
+    final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications =
+        loadModificationsFromTsFile(tsFile);
 
     if (!targetDir.exists() && !targetDir.mkdirs() && !targetDir.exists()) {
       throw new LoadFileException(
@@ -102,7 +111,10 @@ public class LoadObjectFileUtil {
 
       while (iterator.hasNext()) {
         final Map<IDeviceID, List<TimeseriesMetadata>> deviceMetadataMap = 
iterator.next();
-        for (final List<TimeseriesMetadata> metadataList : 
deviceMetadataMap.values()) {
+        for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+            deviceMetadataMap.entrySet()) {
+          final IDeviceID deviceID = entry.getKey();
+          final List<TimeseriesMetadata> metadataList = entry.getValue();
           final List<Map<Integer, long[]>> alignedTimeBatchesByChunkIndex =
               collectAlignedTimeBatchesByChunkIndex(reader, metadataList);
           for (final TimeseriesMetadata tsMetadata : metadataList) {
@@ -110,6 +122,8 @@ public class LoadObjectFileUtil {
               continue;
             }
 
+            final ModsInfo modsInfo =
+                buildMeasurementModsInfo(deviceID, 
tsMetadata.getMeasurementId(), modifications);
             int chunkIndex = 0;
             for (final IChunkMetadata chunkMetadata : 
tsMetadata.getChunkMetadataList()) {
               scanObjectChunkAndProcessFiles(
@@ -119,7 +133,8 @@ public class LoadObjectFileUtil {
                   searchRoot,
                   targetDir,
                   alignedTimeBatchesByChunkIndex,
-                  chunkIndex++);
+                  chunkIndex++,
+                  modsInfo);
             }
           }
         }
@@ -140,7 +155,8 @@ public class LoadObjectFileUtil {
       final File searchRoot,
       final File targetDir,
       final List<Map<Integer, long[]>> alignedTimeBatchesByChunkIndex,
-      final int chunkIndex)
+      final int chunkIndex,
+      final ModsInfo modsInfo)
       throws IOException, LoadFileException {
     reader.position(chunkMetadata.getOffsetOfChunkHeader());
     final byte marker = reader.readMarker();
@@ -170,7 +186,7 @@ public class LoadObjectFileUtil {
                     .nextValueBatch(times);
             final int len = Math.min(values.length, times.length);
             for (int i = 0; i < len; i++) {
-              if (values[i] == null) {
+              if (values[i] == null || ModsOperationUtil.isDelete(times[i], 
modsInfo)) {
                 continue;
               }
               final Binary binary = values[i].getBinary();
@@ -194,7 +210,7 @@ public class LoadObjectFileUtil {
                 .getAllSatisfiedPageData();
         while (batchData.hasCurrent()) {
           final Binary binary = batchData.getBinary();
-          if (binary != null) {
+          if (binary != null && 
!ModsOperationUtil.isDelete(batchData.currentTime(), modsInfo)) {
             final Pair<Long, String> lengthAndPath =
                 ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary);
             final long expectedLength = lengthAndPath.getLeft();
@@ -227,6 +243,30 @@ public class LoadObjectFileUtil {
     return result;
   }
 
+  private static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+      loadModificationsFromTsFile(final File tsFile) throws IOException {
+    final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications =
+        PatternTreeMapFactory.getModsPatternTreeMap();
+    for (final ModEntry modification : 
ModificationFile.readAllModifications(tsFile, true)) {
+      modifications.append(modification.keyOfPatternTree(), modification);
+    }
+    return modifications;
+  }
+
+  private static ModsInfo buildMeasurementModsInfo(
+      final IDeviceID deviceID,
+      final String measurement,
+      final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications) {
+    if (modifications == null || modifications.isEmpty()) {
+      return null;
+    }
+
+    final List<ModsInfo> modsInfos =
+        ModsOperationUtil.initializeMeasurementMods(
+            deviceID, Collections.singletonList(measurement), modifications);
+    return modsInfos.isEmpty() ? null : modsInfos.get(0);
+  }
+
   private static Map<Integer, long[]> collectAlignedTimeBatchForChunk(
       final TsFileSequenceReader reader, final IChunkMetadata chunkMetadata) 
throws IOException {
     final Map<Integer, long[]> timeBatchByPage = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
index 7eb09bbab41..762453ea51d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
@@ -19,14 +19,20 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
 
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
 
 import org.apache.tsfile.read.common.TimeRange;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -383,6 +389,47 @@ public class ModsOperationUtilTest {
     assertEquals(2, binarySearchMods(mods, 25, 2));
   }
 
+  @Test
+  public void testBuildObjectColumnDeletionEntriesForFullTableDeletion() {
+    final TableDeletionEntry tableDeletionEntry =
+        new TableDeletionEntry(new DeletionPredicate("table1", new NOP()), new 
TimeRange(0, 100));
+    final Set<String> objectMeasurements =
+        new LinkedHashSet<>(Arrays.asList("obj_col_1", "obj_col_2"));
+
+    final TableDeletionEntry generatedEntry =
+        (TableDeletionEntry)
+            ModsOperationUtil.buildObjectColumnDeletionEntries(
+                tableDeletionEntry, objectMeasurements);
+
+    assertEquals(
+        Arrays.asList("obj_col_1", "obj_col_2"),
+        generatedEntry.getPredicate().getMeasurementNames());
+    assertEquals(0, generatedEntry.getTimeRange().getMin());
+    assertEquals(100, generatedEntry.getTimeRange().getMax());
+  }
+
+  @Test
+  public void testBuildObjectColumnDeletionEntriesForSpecifiedMeasurements() {
+    final TableDeletionEntry tableDeletionEntry =
+        new TableDeletionEntry(
+            new DeletionPredicate(
+                "table1", new NOP(), Arrays.asList("s1", "obj_col_2", 
"obj_col_1")),
+            new TimeRange(0, 100));
+    final Set<String> objectMeasurements =
+        new LinkedHashSet<>(Arrays.asList("obj_col_1", "obj_col_2", 
"obj_col_3"));
+
+    final TableDeletionEntry generatedEntry =
+        (TableDeletionEntry)
+            ModsOperationUtil.buildObjectColumnDeletionEntries(
+                tableDeletionEntry, objectMeasurements);
+
+    assertEquals(
+        Arrays.asList("obj_col_2", "obj_col_1"),
+        generatedEntry.getPredicate().getMeasurementNames());
+    assertEquals(0, generatedEntry.getTimeRange().getMin());
+    assertEquals(100, generatedEntry.getTimeRange().getMax());
+  }
+
   // Helper method to access the private binarySearchMods method for testing
   private int binarySearchMods(List<ModEntry> mods, long time, int startIndex) 
{
     // Use reflection to access the private method
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
index de4f7fb89a2..4ee5131b3e2 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java
@@ -154,8 +154,13 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
       final TsFileResource tsFileResource = event.getTsFileResource();
       final File tsFile = event.getTsFile();
       if (tsFile != null && tsFile.exists()) {
+        final File modFile =
+            event.isWithMod() && event.getModFile() != null && 
event.getModFile().exists()
+                ? event.getModFile()
+                : null;
         remoteFileTransfer.transferFile(
             tsFile,
+            modFile,
             tsFileResource == null
                 ? null
                 : PipeObjectPathUtil.resolveLinkedObjectDirectory(
@@ -166,7 +171,7 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
       final File tsFile = tsFileInsertionEvent.getTsFile();
       if (tsFile != null && tsFile.exists()) {
         remoteFileTransfer.transferFile(
-            tsFile, null, 
TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent));
+            tsFile, null, null, 
TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent));
       }
     }
   }
@@ -274,7 +279,7 @@ public class PipeTsFileRemoteSink implements PipeSink, 
PipeBatchMetricsSettable
       final File objectDir = tsFileAndObjectDir.getRight();
       if (tsFile != null && tsFile.exists()) {
         remoteFileTransfer.transferFile(
-            tsFile, objectDir, 
TsFileNameGenerator.targetNameForGeneratedFile(tsFile));
+            tsFile, null, objectDir, 
TsFileNameGenerator.targetNameForGeneratedFile(tsFile));
       }
     }
     
eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileRemoteSink.class.getName(),
 true);
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java
index 9c440f971ac..604a3946503 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java
@@ -25,7 +25,8 @@ import java.io.IOException;
 interface RemoteFileTransfer {
   void handshake() throws IOException;
 
-  void transferFile(File tsFile, File objectSourceDir, String targetName) 
throws IOException;
+  void transferFile(File tsFile, File modFile, File objectSourceDir, String 
targetName)
+      throws IOException;
 
   void close() throws IOException;
 }
diff --git 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
index c47545902cf..113cdaae122 100644
--- 
a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
+++ 
b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.pipe.plugin.sink.tsfile;
 
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -104,11 +105,13 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer 
{
   }
 
   @Override
-  public void transferFile(File tsFile, File objectSourceDir, String 
targetName)
+  public void transferFile(File tsFile, File modFile, File objectSourceDir, 
String targetName)
       throws IOException {
     try {
       syncObjectDirectory(objectSourceDir, targetName);
-      syncTsFile(tsFile, targetName);
+      final String finalTsName = computeFinalTsName(targetName);
+      syncModFile(modFile, finalTsName);
+      syncTsFile(tsFile, finalTsName);
     } catch (final Exception e) {
       invalidateSession();
       throw new IOException("Scp transfer failed: " + targetName, e);
@@ -155,15 +158,30 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer 
{
     }
   }
 
-  private void syncTsFile(File tsFile, String targetName) throws IOException {
-    final String finalTsName =
-        targetName.endsWith(TSFILE_EXTENSION) ? targetName : targetName + 
TSFILE_EXTENSION;
+  private static String computeFinalTsName(final String targetName) {
+    return targetName.endsWith(TSFILE_EXTENSION) ? targetName : targetName + 
TSFILE_EXTENSION;
+  }
+
+  private void syncTsFile(final File tsFile, final String finalTsName) throws 
IOException {
     acquireTransferBytes(tsFile.length());
     ScpClientCreator.instance()
         .createScpClient(getSession())
         .upload(tsFile.toPath(), remoteBaseDir + UNIX_SEPARATOR + finalTsName);
   }
 
+  private void syncModFile(final File modFile, final String finalTsName) 
throws IOException {
+    if (modFile == null || !modFile.exists()) {
+      return;
+    }
+    final String remoteModPath =
+        remoteBaseDir + UNIX_SEPARATOR + finalTsName + 
ModificationFile.FILE_SUFFIX;
+    acquireTransferBytes(modFile.length());
+    ScpClientCreator.instance()
+        .createScpClient(getSession())
+        .upload(modFile.toPath(), remoteModPath);
+    LOGGER.info("Successfully transferred TsFile mod to {}", remoteModPath);
+  }
+
   private void ensureRemoteDirExists(ClientSession s, String dir) throws 
IOException {
     executeRemoteCommand(s, "mkdir -p " + shellQuote(dir));
   }


Reply via email to