This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new c3e7e6f91f1 [To dev/1.3] Pipe: Add TsFile parsing with Mods function 
(#16540) (#16651)
c3e7e6f91f1 is described below

commit c3e7e6f91f1ae6703af7f806bb35226ef0bd9af6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Oct 28 14:09:07 2025 +0800

    [To dev/1.3] Pipe: Add TsFile parsing with Mods function (#16540) (#16651)
---
 .../it/env/cluster/config/MppConfigNodeConfig.java |   6 +
 .../env/remote/config/RemoteConfigNodeConfig.java  |   5 +
 .../apache/iotdb/itbase/env/ConfigNodeConfig.java  |   2 +
 .../IoTDBPipeTsFileDecompositionWithModsIT.java    | 660 +++++++++++++++++++++
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   2 +-
 .../container/TsFileInsertionDataContainer.java    |  16 +
 .../TsFileInsertionDataContainerProvider.java      |  45 +-
 .../query/TsFileInsertionQueryDataContainer.java   |  79 ++-
 .../TsFileInsertionQueryDataTabletIterator.java    |  33 +-
 .../scan/TsFileInsertionScanDataContainer.java     | 377 ++++++++----
 .../tsfile/parser/util/ModsOperationUtil.java      | 315 ++++++++++
 ...peStatementDataTypeConvertExecutionVisitor.java |   2 +-
 ...eeStatementDataTypeConvertExecutionVisitor.java |   8 +-
 .../event/TsFileInsertionDataContainerTest.java    |   2 +-
 .../tsfile/parser/util/ModsOperationUtilTest.java  | 406 +++++++++++++
 15 files changed, 1817 insertions(+), 141 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
index 8e4a6def365..4385097ce04 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
@@ -67,4 +67,10 @@ public class MppConfigNodeConfig extends MppBaseConfig 
implements ConfigNodeConf
     properties.setProperty("metric_prometheus_reporter_password", password);
     return this;
   }
+
+  @Override
+  public ConfigNodeConfig setLeaderDistributionPolicy(String policy) {
+    properties.setProperty("leader_distribution_policy", policy);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
index ae8645eff52..c16722f4bd9 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
@@ -38,4 +38,9 @@ public class RemoteConfigNodeConfig implements 
ConfigNodeConfig {
   public ConfigNodeConfig setMetricPrometheusReporterPassword(String password) 
{
     return this;
   }
+
+  @Override
+  public ConfigNodeConfig setLeaderDistributionPolicy(String policy) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
index 65a5a3271fc..4af35f6f56a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
@@ -29,4 +29,6 @@ public interface ConfigNodeConfig {
   ConfigNodeConfig setMetricPrometheusReporterUsername(String username);
 
   ConfigNodeConfig setMetricPrometheusReporterPassword(String password);
+
+  ConfigNodeConfig setLeaderDistributionPolicy(String policy);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
new file mode 100644
index 00000000000..48c7e83240c
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualManualIT {
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+  }
+
+  /**
+   * Test IoTDB pipe handling TsFile decomposition with Mods (modification 
operations) in tree model
+   *
+   * <p>Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned 
timeseries), d2
+   * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned 
timeseries) 2. Insert initial
+   * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile 
4. Execute DELETE
+   * operation on d1.s1, deleting data in time range 2-4 5. Insert large 
amount of data into d4
+   * (11000 records, inserted in batches) 6. Execute multiple DELETE 
operations on d4: - Delete s1
+   * field data where time<=10000 - Delete s2 field data where time>1000 - 
Delete s3 field data
+   * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH 
operation again 9. Create
+   * pipe with mods enabled, synchronize data to receiver 10. Verify 
correctness of receiver data: -
+   * d1 s1 field is null in time range 2-4, other data is normal - d2 and d3 
data is completely
+   * deleted - d4 DELETE operation results meet expectations for each field
+   *
+   * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods 
(modification operations) in
+   * TsFile under tree model, ensuring various DELETE operations can be 
correctly synchronized to
+   * the receiver and data consistency is guaranteed.
+   */
+  @Test
+  public void testTsFileDecompositionWithMods() throws Exception {
+    TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+    TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE 
root.sg1");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT, 
s3 FLOAT)");
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT");
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT");
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT, 
s3 FLOAT)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0, 
2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4, 
2.4, 3.4)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2, 
10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT, 
s3 FLOAT)");
+    String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES ";
+    StringBuilder insertBuilder = new StringBuilder(s);
+    for (int i = 1; i <= 11000; i++) {
+      insertBuilder
+          .append("(")
+          .append(i)
+          .append(",")
+          .append(1.0f)
+          .append(",")
+          .append(2.0f)
+          .append(",")
+          .append(3.0f)
+          .append(")");
+      if (i % 100 != 0) {
+        insertBuilder.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder.toString());
+        insertBuilder = new StringBuilder(s);
+      }
+    }
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1 
WHERE time <= 10000");
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2 
WHERE time > 1000");
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3 
WHERE time <= 8000");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    // Verify sender data integrity before creating pipe to avoid leader 
election issues
+    // This ensures all data is properly persisted and consistent on sender 
side
+    // before starting the pipe synchronization process
+    HashSet<String> results = new HashSet<>();
+    results.add("1,3.0,1.0,2.0,");
+    results.add("2,3.1,null,2.1,");
+    results.add("3,3.2,null,2.2,");
+    results.add("4,3.3,null,2.3,");
+    results.add("5,3.4,1.4,2.4,");
+
+    TestUtils.assertDataEventuallyOnEnv(
+        senderEnv,
+        "SELECT * FROM root.sg1.d1 ORDER BY time",
+        "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+        results);
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH 
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT * FROM root.sg1.d1 ORDER BY time",
+        "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+        results);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,", 
Collections.emptySet());
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,", 
Collections.emptySet());
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4",
+        "Time,root.sg1.d1.s1,",
+        Collections.emptySet());
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(**) FROM root.sg1.d4",
+        "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),",
+        Collections.singleton("3000,1000,1000,"));
+  }
+
+  /**
+   * Test IoTDB pipe handling TsFile decomposition with Mods (modification 
operations) in tree model
+   * - Multi-pipe scenario
+   *
+   * <p>Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned 
timeseries), d2
+   * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned 
timeseries) 2. Insert initial
+   * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile 
4. Execute DELETE
+   * operation on d1.s1, deleting data in time range 2-4 5. Insert large 
amount of data into d4
+   * (11000 records, inserted in batches) 6. Execute multiple DELETE 
operations on d4: - Delete s1
+   * field data where time<=10000 - Delete s2 field data where time>1000 - 
Delete s3 field data
+   * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH 
operation again 9. Create 4
+   * independent pipes, each targeting different device paths: - test_pipe1: 
handles data for
+   * root.sg1.d1.** path - test_pipe2: handles data for root.sg1.d2.** path - 
test_pipe3: handles
+   * data for root.sg1.d3.** path - test_pipe4: handles data for 
root.sg1.d4.** path 10. Verify
+   * correctness of receiver data: - d1 s1 field is null in time range 2-4, 
other data is normal -
+   * d2 and d3 data is completely deleted - d4 DELETE operation results meet 
expectations for each
+   * field
+   *
+   * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods 
(modification operations) in
+   * TsFile under tree model through multiple independent pipes, ensuring 
DELETE operations for
+   * different paths can be correctly synchronized to the receiver and data 
consistency is
+   * guaranteed. The main difference from the first test method is using 
multiple pipes to handle
+   * data for different devices separately.
+   */
+  @Test
+  public void testTsFileDecompositionWithMods2() throws Exception {
+    TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+    TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE 
root.sg1");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT, 
s3 FLOAT)");
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT");
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT");
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT, 
s3 FLOAT)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0, 
2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4, 
2.4, 3.4)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2, 
10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv,
+        "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0, 
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+    TestUtils.executeNonQueryWithRetry(
+        senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT, 
s3 FLOAT)");
+    String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES ";
+    StringBuilder insertBuilder = new StringBuilder(s);
+    for (int i = 1; i <= 11000; i++) {
+      insertBuilder
+          .append("(")
+          .append(i)
+          .append(",")
+          .append(1.0f)
+          .append(",")
+          .append(2.0f)
+          .append(",")
+          .append(3.0f)
+          .append(")");
+      if (i % 100 != 0) {
+        insertBuilder.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder.toString());
+        insertBuilder = new StringBuilder(s);
+      }
+    }
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1 
WHERE time <= 10000");
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2 
WHERE time > 1000");
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3 
WHERE time <= 8000");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*");
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    // Verify sender data integrity before creating pipes to avoid leader 
election issues
+    // This ensures all data is properly persisted and consistent on sender 
side
+    // before starting the pipe synchronization process
+    HashSet<String> results = new HashSet<>();
+    results.add("1,3.0,1.0,2.0,");
+    results.add("2,3.1,null,2.1,");
+    results.add("3,3.2,null,2.2,");
+    results.add("4,3.3,null,2.3,");
+    results.add("5,3.4,1.4,2.4,");
+
+    TestUtils.assertDataEventuallyOnEnv(
+        senderEnv,
+        "SELECT * FROM root.sg1.d1 ORDER BY time",
+        "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+        results);
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe1 WITH SOURCE 
('mods.enable'='true','path'='root.sg1.d1.**') WITH CONNECTOR('ip'='%s', 
'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe2 WITH SOURCE 
('mods.enable'='true','path'='root.sg1.d2.**') WITH CONNECTOR('ip'='%s', 
'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe3 WITH SOURCE 
('mods.enable'='true','path'='root.sg1.d3.**') WITH CONNECTOR('ip'='%s', 
'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe4 WITH SOURCE 
('mods.enable'='true','path'='root.sg1.d4.**') WITH CONNECTOR('ip'='%s', 
'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT * FROM root.sg1.d1 ORDER BY time",
+        "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+        results);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,", 
Collections.emptySet());
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,", 
Collections.emptySet());
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4",
+        "Time,root.sg1.d1.s1,",
+        Collections.emptySet());
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(**) FROM root.sg1.d4",
+        "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),",
+        Collections.singleton("3000,1000,1000,"));
+  }
+
+  /**
+   * Test IoTDB pipe handling TsFile decomposition with Mods (modification 
operations) in tree model
+   * - Large scale single point deletion scenario
+   *
+   * <p>Test scenario: 1. Create database root.sg1 with 1 device: d1 (aligned 
timeseries with 5
+   * sensors) 2. Insert 20000 data points for each sensor with different time 
ranges: - s1: time
+   * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time 
30001-50000 - s5: time
+   * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4. 
Execute 2000 single point
+   * DELETE operations, each deleting one time point from different sensors 5. 
Execute FLUSH
+   * operation again 6. Create pipe with mods enabled 7. Verify correctness of 
receiver data: - Each
+   * sensor should have 19800 remaining data points - Deleted points should 
not appear in receiver
+   *
+   * <p>Test purpose: Verify that IoTDB pipe can correctly handle large scale 
single point deletion
+   * operations in TsFile under tree model, ensuring the binary search 
optimization in
+   * ModsOperationUtil works correctly with many modification entries.
+   */
+  @Test
+  public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() 
throws Exception {
+    TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+    TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE 
root.sg1");
+
+    // Insert 20000 data points for s1 (time 1-20000)
+    String s1 = "INSERT INTO root.sg1.d1(time, s1) ALIGNED VALUES ";
+    StringBuilder insertBuilder1 = new StringBuilder(s1);
+    for (int i = 1; i <= 20000; i++) {
+      
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
+      if (i % 1000 != 0) {
+        insertBuilder1.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder1.toString());
+        insertBuilder1 = new StringBuilder(s1);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder1.length() > s1.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
+    }
+
+    // Insert 20000 data points for s2 (time 10001-30000)
+    String s2 = "INSERT INTO root.sg1.d1(time, s2) ALIGNED VALUES ";
+    StringBuilder insertBuilder2 = new StringBuilder(s2);
+    for (int i = 10001; i <= 30000; i++) {
+      
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
+      if (i % 1000 != 0) {
+        insertBuilder2.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder2.toString());
+        insertBuilder2 = new StringBuilder(s2);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder2.length() > s2.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
+    }
+
+    // Insert 20000 data points for s3 (time 20001-40000)
+    String s3 = "INSERT INTO root.sg1.d1(time, s3) ALIGNED VALUES ";
+    StringBuilder insertBuilder3 = new StringBuilder(s3);
+    for (int i = 20001; i <= 40000; i++) {
+      
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
+      if (i % 1000 != 0) {
+        insertBuilder3.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder3.toString());
+        insertBuilder3 = new StringBuilder(s3);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder3.length() > s3.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
+    }
+
+    // Insert 20000 data points for s4 (time 30001-50000)
+    String s4 = "INSERT INTO root.sg1.d1(time, s4) ALIGNED VALUES ";
+    StringBuilder insertBuilder4 = new StringBuilder(s4);
+    for (int i = 30001; i <= 50000; i++) {
+      
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
+      if (i % 1000 != 0) {
+        insertBuilder4.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder4.toString());
+        insertBuilder4 = new StringBuilder(s4);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder4.length() > s4.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
+    }
+
+    // Insert 20000 data points for s5 (time 40001-60000)
+    String s5 = "INSERT INTO root.sg1.d1(time, s5) ALIGNED VALUES ";
+    StringBuilder insertBuilder5 = new StringBuilder(s5);
+    for (int i = 40001; i <= 60000; i++) {
+      
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
+      if (i % 1000 != 0) {
+        insertBuilder5.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder5.toString());
+        insertBuilder5 = new StringBuilder(s5);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder5.length() > s5.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
+    }
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    // Execute 2000 single point DELETE operations
+    // Delete 400 points from each sensor (distributed across different time 
ranges)
+    for (int i = 0; i < 400; i++) {
+      // Delete from s1: every 10th point starting from 10
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i * 
10));
+
+      // Delete from s2: every 10th point starting from 10010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i * 
10));
+
+      // Delete from s3: every 10th point starting from 20010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i * 
10));
+
+      // Delete from s4: every 10th point starting from 30010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i * 
10));
+
+      // Delete from s5: every 10th point starting from 40010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i * 
10));
+    }
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    // Verify sender data integrity before creating pipe to avoid leader 
election issues
+    // This ensures all data is properly persisted and consistent on sender 
side
+    // before starting the pipe synchronization process
+    TestUtils.assertDataEventuallyOnEnv(
+        senderEnv,
+        "SELECT COUNT(**) FROM root.sg1.d1",
+        
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+        Collections.singleton("19600,19600,19600,19600,19600,"));
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH 
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    // Verify total count of all sensors using COUNT(*)
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(**) FROM root.sg1.d1",
+        
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+        Collections.singleton("19600,19600,19600,19600,19600,"));
+
+    // Verify individual sensor counts
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("19600,"));
+
+    // Verify count of deleted time ranges using COUNT with WHERE clause
+    // These should return 0 since all points in these ranges were deleted
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 
AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 
14000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 
24000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 
34000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 
44000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("0,"));
+
+    // Verify count of non-deleted time ranges using multiple range queries
+    // Check ranges before deletion area
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time < 
10010",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time < 
20010",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time < 
30010",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time < 
40010",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("9,"));
+
+    // Check ranges after deletion area
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <= 
20000",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <= 
30000",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <= 
40000",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <= 
50000",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <= 
60000",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("16000,"));
+
+    // Check non-deleted points within deletion range (every 10th point except 
the ones we deleted)
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 
AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 
14000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 
24000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 
34000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 
44000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("3591,"));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 1e63a2d4af3..858c425c3e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -553,7 +553,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
                   endTime,
                   pipeTaskMeta,
                   this)
-              .provide());
+              .provide(isWithMod));
       return dataContainer.get();
     } catch (final IOException e) {
       close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 77ef2c871cb..9f779528f52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container;
 
+import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -26,6 +27,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.read.TsFileSequenceReader;
@@ -49,6 +52,10 @@ public abstract class TsFileInsertionDataContainer 
implements AutoCloseable {
   protected final PipeTaskMeta pipeTaskMeta; // used to report progress
   protected final EnrichedEvent sourceEvent; // used to report progress
 
+  // mods entry
+  protected PipeMemoryBlock allocatedMemoryBlockForModifications;
+  protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
currentModifications;
+
   protected final long initialTimeNano = System.nanoTime();
   protected boolean timeUsageReported = false;
 
@@ -117,5 +124,14 @@ public abstract class TsFileInsertionDataContainer 
implements AutoCloseable {
     if (allocatedMemoryBlockForTablet != null) {
       allocatedMemoryBlockForTablet.close();
     }
+
+    if (currentModifications != null) {
+      // help GC
+      currentModifications = null;
+    }
+
+    if (allocatedMemoryBlockForModifications != null) {
+      allocatedMemoryBlockForModifications.close();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index e8b29c72c40..b6c24365f3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -72,7 +72,7 @@ public class TsFileInsertionDataContainerProvider {
     this.sourceEvent = sourceEvent;
   }
 
-  public TsFileInsertionDataContainer provide() throws IOException {
+  public TsFileInsertionDataContainer provide(final boolean isWithMod) throws 
IOException {
     if (pipeName != null) {
       PipeTsFileToTabletsMetrics.getInstance()
           .markTsFileToTabletInvocation(pipeName + "_" + creationTime);
@@ -83,7 +83,15 @@ public class TsFileInsertionDataContainerProvider {
             / PipeMemoryManager.getTotalNonFloatingMemorySizeInBytes()
         > PipeTsFilePublicResource.MEMORY_SUFFICIENT_THRESHOLD) {
       return new TsFileInsertionScanDataContainer(
-          pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          pattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent,
+          isWithMod);
     }
 
     if (pattern instanceof UnionIoTDBPipePattern
@@ -95,7 +103,15 @@ public class TsFileInsertionDataContainerProvider {
       // hard to know whether it only matches one timeseries, while matching 
multiple is often the
       // case.
       return new TsFileInsertionQueryDataContainer(
-          pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          pattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent,
+          isWithMod);
     }
 
     final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -104,7 +120,15 @@ public class TsFileInsertionDataContainerProvider {
       // If we failed to get from cache, it indicates that the memory usage is 
high.
       // We use scan data container because it requires less memory.
       return new TsFileInsertionScanDataContainer(
-          pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          pattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent,
+          isWithMod);
     }
 
     final int originalSize = deviceIsAlignedMap.size();
@@ -114,7 +138,15 @@ public class TsFileInsertionDataContainerProvider {
     return (double) filteredDeviceIsAlignedMap.size() / originalSize
             > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
         ? new TsFileInsertionScanDataContainer(
-            pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent)
+            pipeName,
+            creationTime,
+            tsFile,
+            pattern,
+            startTime,
+            endTime,
+            pipeTaskMeta,
+            sourceEvent,
+            isWithMod)
         : new TsFileInsertionQueryDataContainer(
             pipeName,
             creationTime,
@@ -124,7 +156,8 @@ public class TsFileInsertionDataContainerProvider {
             endTime,
             pipeTaskMeta,
             sourceEvent,
-            filteredDeviceIsAlignedMap);
+            filteredDeviceIsAlignedMap,
+            isWithMod);
   }
 
   private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 897d820df90..4a0c2be0bf0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -26,16 +26,19 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.read.TsFileDeviceIterator;
 import org.apache.tsfile.read.TsFileReader;
 import org.apache.tsfile.read.TsFileSequenceReader;
@@ -72,7 +75,7 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
   public TsFileInsertionQueryDataContainer(
       final File tsFile, final PipePattern pattern, final long startTime, 
final long endTime)
       throws IOException {
-    this(null, 0, tsFile, pattern, startTime, endTime, null, null);
+    this(null, 0, tsFile, pattern, startTime, endTime, null, null, false);
   }
 
   public TsFileInsertionQueryDataContainer(
@@ -83,7 +86,8 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
-      final EnrichedEvent sourceEvent)
+      final EnrichedEvent sourceEvent,
+      final boolean isWithMod)
       throws IOException {
     this(
         pipeName,
@@ -94,7 +98,8 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
         endTime,
         pipeTaskMeta,
         sourceEvent,
-        null);
+        null,
+        isWithMod);
   }
 
   public TsFileInsertionQueryDataContainer(
@@ -106,11 +111,20 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
       final EnrichedEvent sourceEvent,
-      final Map<IDeviceID, Boolean> deviceIsAlignedMap)
+      final Map<IDeviceID, Boolean> deviceIsAlignedMap,
+      final boolean isWithMod)
       throws IOException {
     super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
 
     try {
+      currentModifications =
+          isWithMod
+              ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+              : PatternTreeMapFactory.getModsPatternTreeMap();
+      allocatedMemoryBlockForModifications =
+          PipeDataNodeResourceManager.memory()
+              
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
+
       final PipeTsFileResourceManager tsFileResourceManager = 
PipeDataNodeResourceManager.tsfile();
       final Map<IDeviceID, List<String>> deviceMeasurementsMap;
 
@@ -155,6 +169,60 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
       allocatedMemoryBlock =
           
PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
 
+      final Iterator<Map.Entry<IDeviceID, List<String>>> iterator =
+          deviceMeasurementsMap.entrySet().iterator();
+      while (isWithMod && iterator.hasNext()) {
+        final Map.Entry<IDeviceID, List<String>> entry = iterator.next();
+        final IDeviceID deviceId = entry.getKey();
+        final List<String> measurements = entry.getValue();
+
+        // Check if deviceId is deleted
+        if (deviceId == null) {
+          LOGGER.warn("Found null deviceId, removing entry");
+          iterator.remove();
+          continue;
+        }
+
+        // Check if measurements list is deleted or empty
+        if (measurements == null || measurements.isEmpty()) {
+          iterator.remove();
+          continue;
+        }
+
+        if (!currentModifications.isEmpty()) {
+          // Safely filter measurements, remove non-existent measurements
+          measurements.removeIf(
+              measurement -> {
+                if (measurement == null) {
+                  return true;
+                }
+
+                try {
+                  TimeseriesMetadata meta =
+                      tsFileSequenceReader.readTimeseriesMetadata(deviceId, 
measurement, true);
+                  return ModsOperationUtil.isAllDeletedByMods(
+                      deviceId.toString(),
+                      measurement,
+                      meta.getStatistics().getStartTime(),
+                      meta.getStatistics().getEndTime(),
+                      currentModifications);
+                } catch (IOException e) {
+                  LOGGER.warn(
+                      "Failed to read metadata for deviceId: {}, measurement: 
{}, removing",
+                      deviceId,
+                      measurement,
+                      e);
+                  return true;
+                }
+              });
+        }
+
+        // If measurements list is empty after filtering, remove the entire 
entry
+        if (measurements.isEmpty()) {
+          iterator.remove();
+        }
+      }
+
       // Filter again to get the final deviceMeasurementsMap that exactly 
matches the pattern.
       deviceMeasurementsMapIterator =
           
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
@@ -302,7 +370,8 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
                               ((PlainDeviceID) entry.getKey()).toStringID(),
                               entry.getValue(),
                               timeFilterExpression,
-                              allocatedMemoryBlockForTablet);
+                              allocatedMemoryBlockForTablet,
+                              currentModifications);
                     } catch (final Exception e) {
                       close();
                       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index 5fa252412d4..e16c7113da3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -19,9 +19,13 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
 
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
@@ -60,6 +64,9 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
 
   private final PipeMemoryBlock allocatedBlockForTablet;
 
+  // Maintain sorted mods list and current index for each measurement
+  private final List<ModsOperationUtil.ModsInfo> measurementModsList;
+
   private RowRecord rowRecord;
 
   TsFileInsertionQueryDataTabletIterator(
@@ -68,7 +75,8 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
       final String deviceId,
       final List<String> measurements,
       final IExpression timeFilterExpression,
-      final PipeMemoryBlock allocatedBlockForTablet)
+      final PipeMemoryBlock allocatedBlockForTablet,
+      final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
currentModifications)
       throws IOException {
     this.tsFileReader = tsFileReader;
     this.measurementDataTypeMap = measurementDataTypeMap;
@@ -88,6 +96,10 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
     this.queryDataSet = buildQueryDataSet();
 
     this.allocatedBlockForTablet = 
Objects.requireNonNull(allocatedBlockForTablet);
+
+    this.measurementModsList =
+        ModsOperationUtil.initializeMeasurementMods(
+            deviceId, this.measurements, currentModifications);
   }
 
   private QueryDataSet buildQueryDataSet() throws IOException {
@@ -155,16 +167,23 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
 
       final int rowIndex = tablet.rowSize;
 
-      tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
-
+      boolean isNeedFillTime = false;
       final List<Field> fields = rowRecord.getFields();
       final int fieldSize = fields.size();
       for (int i = 0; i < fieldSize; i++) {
         final Field field = fields.get(i);
-        tablet.addValue(
-            measurements.get(i),
-            rowIndex,
-            field == null ? null : 
field.getObjectValue(schemas.get(i).getType()));
+        final String measurement = measurements.get(i);
+        // Check if this value is deleted by mods
+        if (field == null
+            || ModsOperationUtil.isDelete(rowRecord.getTimestamp(), 
measurementModsList.get(i))) {
+          tablet.bitMaps[i].mark(rowIndex);
+        } else {
+          tablet.addValue(measurement, rowIndex, 
field.getObjectValue(schemas.get(i).getType()));
+          isNeedFillTime = true;
+        }
+      }
+      if (isNeedFillTime) {
+        tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
       }
 
       tablet.rowSize++;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index c5ef8423297..da2a94fab18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -19,15 +19,19 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -36,7 +40,10 @@ import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.common.Chunk;
@@ -51,11 +58,14 @@ import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -65,6 +75,8 @@ import java.util.Objects;
 
 public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContainer {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ModsOperationUtil.class);
+
   private static final LocalDate EMPTY_DATE = LocalDate.of(1000, 1, 1);
 
   private final long startTime;
@@ -81,6 +93,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   private boolean currentIsAligned;
   private final List<MeasurementSchema> currentMeasurements = new 
ArrayList<>();
 
+  private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
   // Cached time chunk
   private final List<Chunk> timeChunkList = new ArrayList<>();
   private final List<Boolean> isMultiPageList = new ArrayList<>();
@@ -99,7 +112,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
-      final EnrichedEvent sourceEvent)
+      final EnrichedEvent sourceEvent,
+      final boolean isWithMod)
       throws IOException {
     super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
 
@@ -116,7 +130,19 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
 
     try {
-      tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
+      currentModifications =
+          isWithMod
+              ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+              : PatternTreeMapFactory.getModsPatternTreeMap();
+      allocatedMemoryBlockForModifications =
+          PipeDataNodeResourceManager.memory()
+              
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
+
+      tsFileSequenceReader =
+          new TsFileSequenceReader(
+              tsFile.getAbsolutePath(),
+              !currentModifications.isEmpty(),
+              !currentModifications.isEmpty());
       tsFileSequenceReader.position((long) 
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
 
       prepareData();
@@ -132,9 +158,10 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
-      final EnrichedEvent sourceEvent)
+      final EnrichedEvent sourceEvent,
+      final boolean isWithMod)
       throws IOException {
-    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
+    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent, isWithMod);
   }
 
   @Override
@@ -251,8 +278,9 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
           final int rowIndex = tablet.rowSize;
 
-          tablet.addTimestamp(rowIndex, data.currentTime());
-          putValueToColumns(data, tablet, rowIndex);
+          if (putValueToColumns(data, tablet, rowIndex)) {
+            tablet.addTimestamp(rowIndex, data.currentTime());
+          }
 
           tablet.rowSize++;
         }
@@ -304,13 +332,14 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     } while (!data.hasCurrent());
   }
 
-  private void putValueToColumns(final BatchData data, final Tablet tablet, 
final int rowIndex) {
+  private boolean putValueToColumns(final BatchData data, final Tablet tablet, 
final int rowIndex) {
     final Object[] columns = tablet.values;
-
+    boolean isNeedFillTime = false;
     if (data.getDataType() == TSDataType.VECTOR) {
       for (int i = 0; i < columns.length; ++i) {
         final TsPrimitiveType primitiveType = data.getVector()[i];
-        if (Objects.isNull(primitiveType)) {
+        if (Objects.isNull(primitiveType)
+            || ModsOperationUtil.isDelete(data.currentTime(), 
modsInfos.get(i))) {
           tablet.bitMaps[i].mark(rowIndex);
           final TSDataType type = tablet.getSchemas().get(i).getType();
           if (type == TSDataType.TEXT || type == TSDataType.BLOB || type == 
TSDataType.STRING) {
@@ -321,6 +350,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
           }
           continue;
         }
+
+        isNeedFillTime = true;
         switch (tablet.getSchemas().get(i).getType()) {
           case BOOLEAN:
             ((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
@@ -352,6 +383,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         }
       }
     } else {
+      isNeedFillTime = true;
       switch (tablet.getSchemas().get(0).getType()) {
         case BOOLEAN:
           ((boolean[]) columns[0])[rowIndex] = data.getBoolean();
@@ -381,6 +413,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
           throw new UnSupportedDataTypeException("UnSupported" + 
data.getDataType());
       }
     }
+    return isNeedFillTime;
   }
 
   private void moveToNextChunkReader() throws IOException, 
IllegalStateException {
@@ -388,6 +421,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     long valueChunkSize = 0;
     final List<Chunk> valueChunkList = new ArrayList<>();
     currentMeasurements.clear();
+    modsInfos.clear();
 
     if (lastMarker == MetaMarker.SEPARATOR) {
       chunkReader = null;
@@ -403,134 +437,211 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         case MetaMarker.TIME_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
-          // Notice that the data in one chunk group is either aligned or 
non-aligned
-          // There is no need to consider non-aligned chunks when there are 
value chunks
-          currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
+          {
+            // Notice that the data in one chunk group is either aligned or 
non-aligned
+            // There is no need to consider non-aligned chunks when there are 
value chunks
+            currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
+            long currentChunkHeaderOffset = tsFileSequenceReader.position() - 
1;
+            chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+            final long nextMarkerOffset =
+                tsFileSequenceReader.position() + chunkHeader.getDataSize();
 
-          chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+            if (Objects.isNull(currentDevice)) {
+              tsFileSequenceReader.position(nextMarkerOffset);
+              break;
+            }
 
-          if (Objects.isNull(currentDevice)) {
-            tsFileSequenceReader.position(
-                tsFileSequenceReader.position() + chunkHeader.getDataSize());
-            break;
-          }
+            if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+                == TsFileConstant.TIME_COLUMN_MASK) {
+              timeChunkList.add(
+                  new Chunk(
+                      chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
+              isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
+              break;
+            }
 
-          if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
-              == TsFileConstant.TIME_COLUMN_MASK) {
-            timeChunkList.add(
-                new Chunk(
-                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
-            isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
-            break;
-          }
+            if (!pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
+              tsFileSequenceReader.position(nextMarkerOffset);
+              break;
+            }
 
-          if (!pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
-            tsFileSequenceReader.position(
-                tsFileSequenceReader.position() + chunkHeader.getDataSize());
-            break;
-          }
+            // Skip the chunk if it is fully deleted by mods
+            if (!currentModifications.isEmpty()) {
+              Statistics statistics = null;
+              try {
+                statistics =
+                    findNonAlignedChunkStatistics(
+                        tsFileSequenceReader.getIChunkMetadataList(
+                            CompactionPathUtils.getPath(
+                                currentDevice, 
chunkHeader.getMeasurementID())),
+                        currentChunkHeaderOffset);
+              } catch (IllegalPathException ignore) {
+                LOGGER.warn(
+                    "Failed to get chunk metadata for {}.{}",
+                    currentDevice,
+                    chunkHeader.getMeasurementID());
+              }
 
-          if (chunkHeader.getDataSize() > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-            PipeDataNodeResourceManager.memory()
-                .forceResize(allocatedMemoryBlockForChunk, 
chunkHeader.getDataSize());
-          }
+              if (statistics != null
+                  && ModsOperationUtil.isAllDeletedByMods(
+                      currentDevice,
+                      chunkHeader.getMeasurementID(),
+                      statistics.getStartTime(),
+                      statistics.getEndTime(),
+                      currentModifications)) {
+                tsFileSequenceReader.position(nextMarkerOffset);
+                break;
+              }
+            }
 
-          chunkReader =
-              currentIsMultiPage
-                  ? new ChunkReader(
-                      new Chunk(
-                          chunkHeader,
-                          tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())),
-                      filter)
-                  : new SinglePageWholeChunkReader(
-                      new Chunk(
-                          chunkHeader,
-                          tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
-          currentIsAligned = false;
-          currentMeasurements.add(
-              new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
-          return;
+            if (chunkHeader.getDataSize() > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+              PipeDataNodeResourceManager.memory()
+                  .forceResize(allocatedMemoryBlockForChunk, 
chunkHeader.getDataSize());
+            }
+
+            Chunk chunk =
+                new Chunk(
+                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+
+            chunkReader =
+                currentIsMultiPage
+                    ? new ChunkReader(chunk, filter)
+                    : new SinglePageWholeChunkReader(chunk);
+            currentIsAligned = false;
+            currentMeasurements.add(
+                new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+            modsInfos.addAll(
+                ModsOperationUtil.initializeMeasurementMods(
+                    currentDevice,
+                    Collections.singletonList(chunkHeader.getMeasurementID()),
+                    currentModifications));
+            return;
+          }
         case MetaMarker.VALUE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
-          if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
-            chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
-
-            if (Objects.isNull(currentDevice)
-                || !pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
-              tsFileSequenceReader.position(
-                  tsFileSequenceReader.position() + chunkHeader.getDataSize());
-              break;
-            }
+          {
+            if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
+              long currentChunkHeaderOffset = tsFileSequenceReader.position() 
- 1;
+              chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+              final long nextMarkerOffset =
+                  tsFileSequenceReader.position() + chunkHeader.getDataSize();
+              if (Objects.isNull(currentDevice)
+                  || !pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
+                tsFileSequenceReader.position(nextMarkerOffset);
+                break;
+              }
 
-            // Increase value index
-            final int valueIndex =
-                measurementIndexMap.compute(
-                    chunkHeader.getMeasurementID(),
-                    (measurement, index) -> Objects.nonNull(index) ? index + 1 
: 0);
+              if (!currentModifications.isEmpty()) {
+                // Skip the chunk if it is fully deleted by mods
+                Statistics statistics = null;
+                try {
+                  statistics =
+                      findAlignedChunkStatistics(
+                          tsFileSequenceReader.getIChunkMetadataList(
+                              CompactionPathUtils.getPath(
+                                  currentDevice, 
chunkHeader.getMeasurementID())),
+                          currentChunkHeaderOffset);
+                } catch (IllegalPathException ignore) {
+                  LOGGER.warn(
+                      "Failed to get chunk metadata for {}.{}",
+                      currentDevice,
+                      chunkHeader.getMeasurementID());
+                }
+                if (statistics != null
+                    && ModsOperationUtil.isAllDeletedByMods(
+                        currentDevice,
+                        chunkHeader.getMeasurementID(),
+                        statistics.getStartTime(),
+                        statistics.getEndTime(),
+                        currentModifications)) {
+                  tsFileSequenceReader.position(nextMarkerOffset);
+                  break;
+                }
+              }
 
-            // Emit when encountered non-sequential value chunk, or the chunk 
size exceeds
-            // certain value to avoid OOM
-            // Do not record or end current value chunks when there are empty 
chunks
-            if (chunkHeader.getDataSize() == 0) {
-              break;
-            }
-            boolean needReturn = false;
-            final long timeChunkSize =
-                lastIndex >= 0
-                    ? 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
-                    : 0;
-            if (lastIndex >= 0) {
-              if (valueIndex != lastIndex) {
-                needReturn = recordAlignedChunk(valueChunkList, marker);
-              } else {
-                final long chunkSize = timeChunkSize + valueChunkSize;
-                if (chunkSize + chunkHeader.getDataSize()
-                    > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                  if (valueChunkList.size() == 1
-                      && chunkSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                    PipeDataNodeResourceManager.memory()
-                        .forceResize(allocatedMemoryBlockForChunk, chunkSize);
-                  }
+              // Increase value index
+              final int valueIndex =
+                  measurementIndexMap.compute(
+                      chunkHeader.getMeasurementID(),
+                      (measurement, index) -> Objects.nonNull(index) ? index + 
1 : 0);
+
+              // Emit when encountered non-sequential value chunk, or the 
chunk size exceeds
+              // certain value to avoid OOM
+              // Do not record or end current value chunks when there are 
empty chunks
+              if (chunkHeader.getDataSize() == 0) {
+                break;
+              }
+              boolean needReturn = false;
+              final long timeChunkSize =
+                  lastIndex >= 0
+                      ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+                          timeChunkList.get(lastIndex))
+                      : 0;
+              if (lastIndex >= 0) {
+                if (valueIndex != lastIndex) {
                   needReturn = recordAlignedChunk(valueChunkList, marker);
+                } else {
+                  final long chunkSize = timeChunkSize + valueChunkSize;
+                  if (chunkSize + chunkHeader.getDataSize()
+                      > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+                    if (valueChunkList.size() == 1
+                        && chunkSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+                      PipeDataNodeResourceManager.memory()
+                          .forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
+                    }
+                    needReturn = recordAlignedChunk(valueChunkList, marker);
+                  }
                 }
               }
+              lastIndex = valueIndex;
+              if (needReturn) {
+                firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+                return;
+              }
+            } else {
+              chunkHeader = firstChunkHeader4NextSequentialValueChunks;
+              firstChunkHeader4NextSequentialValueChunks = null;
             }
-            lastIndex = valueIndex;
-            if (needReturn) {
-              firstChunkHeader4NextSequentialValueChunks = chunkHeader;
-              return;
-            }
-          } else {
-            chunkHeader = firstChunkHeader4NextSequentialValueChunks;
-            firstChunkHeader4NextSequentialValueChunks = null;
-          }
 
-          valueChunkSize += chunkHeader.getDataSize();
-          valueChunkList.add(
-              new Chunk(
-                  chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
-          currentMeasurements.add(
-              new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
-          break;
+            Chunk chunk =
+                new Chunk(
+                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+
+            valueChunkSize += chunkHeader.getDataSize();
+            valueChunkList.add(chunk);
+            currentMeasurements.add(
+                new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+            modsInfos.addAll(
+                ModsOperationUtil.initializeMeasurementMods(
+                    currentDevice,
+                    Collections.singletonList(chunkHeader.getMeasurementID()),
+                    currentModifications));
+            break;
+          }
         case MetaMarker.CHUNK_GROUP_HEADER:
-          // Return before "currentDevice" changes
-          if (recordAlignedChunk(valueChunkList, marker)) {
-            return;
+          {
+            // Return before "currentDevice" changes
+            if (recordAlignedChunk(valueChunkList, marker)) {
+              return;
+            }
+            final String deviceID =
+                ((PlainDeviceID) 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
+                    .toStringID();
+            // Clear because the cached data will never be used in the next 
chunk group
+            lastIndex = -1;
+            timeChunkList.clear();
+            isMultiPageList.clear();
+            measurementIndexMap.clear();
+
+            currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID 
: null;
+            break;
           }
-          final String deviceID =
-              ((PlainDeviceID) 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
-                  .toStringID();
-          // Clear because the cached data will never be used in the next 
chunk group
-          lastIndex = -1;
-          timeChunkList.clear();
-          isMultiPageList.clear();
-          measurementIndexMap.clear();
-
-          currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID : 
null;
-          break;
         case MetaMarker.OPERATION_INDEX_RANGE:
-          tsFileSequenceReader.readPlanIndex();
-          break;
+          {
+            tsFileSequenceReader.readPlanIndex();
+            break;
+          }
         default:
           MetaMarker.handleUnexpectedMarker(marker);
       }
@@ -571,4 +682,32 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       allocatedMemoryBlockForChunk.close();
     }
   }
+
+  private Statistics findAlignedChunkStatistics(
+      List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) {
+    for (IChunkMetadata metadata : metadataList) {
+      if (!(metadata instanceof AlignedChunkMetadata)) {
+        continue;
+      }
+      List<IChunkMetadata> list = ((AlignedChunkMetadata) 
metadata).getValueChunkMetadataList();
+      for (IChunkMetadata m : list) {
+        if (m.getOffsetOfChunkHeader() == currentChunkHeaderOffset) {
+          return m.getStatistics();
+        }
+      }
+      break;
+    }
+    return null;
+  }
+
+  private Statistics findNonAlignedChunkStatistics(
+      List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) {
+    for (IChunkMetadata metadata : metadataList) {
+      if (metadata.getOffsetOfChunkHeader() == currentChunkHeaderOffset) {
+        // found the corresponding chunk metadata
+        return metadata.getStatistics();
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
new file mode 100644
index 00000000000..19554c32c06
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utility class for handling mods operations during TsFile parsing. Supports 
mods processing logic
+ * for both tree model and table model.
+ */
+public class ModsOperationUtil {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ModsOperationUtil.class);
+
+  private ModsOperationUtil() {
+    // Utility class, no instantiation allowed
+  }
+
+  /**
+   * Load all modifications from TsFile and build PatternTreeMap
+   *
+   * @param tsFile TsFile file
+   * @return PatternTreeMap containing all modifications
+   */
+  public static PatternTreeMap<Modification, 
PatternTreeMapFactory.ModsSerializer>
+      loadModificationsFromTsFile(File tsFile) {
+    PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
modifications =
+        PatternTreeMapFactory.getModsPatternTreeMap();
+
+    try (ModificationFile file =
+        new ModificationFile(tsFile.getPath() + ModificationFile.FILE_SUFFIX)) 
{
+      file.getModifications()
+          .forEach(modification -> 
modifications.append(modification.getPath(), modification));
+    } catch (Exception e) {
+      throw new PipeException("Failed to load modifications from TsFile: " + 
tsFile.getPath(), e);
+    }
+
+    return modifications;
+  }
+
+  /**
+   * Check if data in the specified time range is completely deleted by mods 
Different logic for
+   * tree model and table model
+   *
+   * @param deviceID device ID
+   * @param measurementID measurement ID
+   * @param startTime start time
+   * @param endTime end time
+   * @param modifications modification records
+   * @return true if data is completely deleted, false otherwise
+   */
+  public static boolean isAllDeletedByMods(
+      String deviceID,
+      String measurementID,
+      long startTime,
+      long endTime,
+      PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
modifications) {
+    if (modifications == null) {
+      return false;
+    }
+    List<Modification> mods = null;
+    try {
+      mods = modifications.getOverlapped(CompactionPathUtils.getPath(deviceID, 
measurementID));
+    } catch (IllegalPathException ignore) {
+      LOGGER.warn("Failed to get mods for device {} and measurement {}", 
deviceID, measurementID);
+    }
+
+    if (mods == null || mods.isEmpty()) {
+      return false;
+    }
+
+    // For tree model: check if any modification covers the time range
+    return mods.stream()
+        .anyMatch(
+            modification -> ((Deletion) 
modification).getTimeRange().contains(startTime, endTime));
+  }
+
+  /**
+   * Initialize mods mapping for specified measurement list
+   *
+   * @param deviceID device ID
+   * @param measurements measurement list
+   * @param modifications modification records
+   * @return mapping from measurement ID to mods list and index
+   */
+  public static List<ModsInfo> initializeMeasurementMods(
+      String deviceID,
+      List<String> measurements,
+      PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
modifications) {
+
+    List<ModsInfo> modsInfos = new ArrayList<>(measurements.size());
+
+    for (final String measurement : measurements) {
+      List<Modification> mods;
+      try {
+        mods = 
modifications.getOverlapped(CompactionPathUtils.getPath(deviceID, measurement));
+      } catch (IllegalPathException ignore) {
+        mods = null;
+        LOGGER.warn("Failed to get mods for device {} and measurement {}", 
deviceID, measurement);
+      }
+
+      if (mods == null || mods.isEmpty()) {
+        // No mods, use empty list and index 0
+        modsInfos.add(new ModsInfo(Collections.emptyList(), 0));
+        continue;
+      }
+      // Store sorted mods and start index
+      modsInfos.add(
+          new 
ModsInfo(modificationConvertionToDeletions(ModificationFile.sortAndMerge(mods)),
 0));
+    }
+
+    return modsInfos;
+  }
+
+  public static List<Deletion> 
modificationConvertionToDeletions(List<Modification> modifications) {
+    List<Deletion> deletions = new ArrayList<>();
+    for (Modification modification : modifications) {
+      if (modification instanceof Deletion) {
+        deletions.add((Deletion) modification);
+      }
+    }
+    return deletions;
+  }
+
+  /**
+   * Check if data at the specified time point is deleted
+   *
+   * @param time time point
+   * @param modsInfo mods information containing mods list and current index
+   * @return true if data is deleted, false otherwise
+   */
+  public static boolean isDelete(long time, ModsInfo modsInfo) {
+    if (modsInfo == null) {
+      return false;
+    }
+
+    final List<Deletion> mods = modsInfo.getMods();
+    if (mods == null || mods.isEmpty()) {
+      return false;
+    }
+
+    int currentIndex = modsInfo.getCurrentIndex();
+    if (currentIndex < 0) {
+      return false;
+    }
+
+    // First, try to use the current index if it's valid
+    if (currentIndex < mods.size()) {
+      final Deletion currentMod = mods.get(currentIndex);
+      final long currentModStartTime = currentMod.getTimeRange().getMin();
+      final long currentModEndTime = currentMod.getTimeRange().getMax();
+
+      if (time < currentModStartTime) {
+        // Time is before current mod, return false
+        return false;
+      } else if (time <= currentModEndTime) {
+        // Time is within current mod range, return true
+        return true;
+      } else {
+        // Time is after current mod, need to search forwards
+        return searchAndCheckMod(mods, time, currentIndex + 1, modsInfo);
+      }
+    } else {
+      // Current index is beyond array bounds, all mods have been processed
+      clearModsAndReset(modsInfo);
+      return false;
+    }
+  }
+
+  /**
+   * Search for a mod using binary search and check if the time point is 
deleted
+   *
+   * @param mods sorted list of mods
+   * @param time time point to search for
+   * @param startIndex starting index for search
+   * @param modsInfo mods information to update
+   * @return true if data is deleted, false otherwise
+   */
+  private static boolean searchAndCheckMod(
+      List<Deletion> mods, long time, int startIndex, ModsInfo modsInfo) {
+    int searchIndex = binarySearchMods(mods, time, startIndex);
+    if (searchIndex >= mods.size()) {
+      // All mods checked, clear mods list and reset index to 0
+      clearModsAndReset(modsInfo);
+      return false;
+    }
+
+    final Deletion foundMod = mods.get(searchIndex);
+    final long foundModStartTime = foundMod.getTimeRange().getMin();
+
+    if (time < foundModStartTime) {
+      modsInfo.setCurrentIndex(searchIndex);
+      return false;
+    }
+
+    modsInfo.setCurrentIndex(searchIndex);
+    return true;
+  }
+
+  /**
+   * Clear mods list and reset index to 0
+   *
+   * @param modsInfo mods information to update
+   */
+  private static void clearModsAndReset(ModsInfo modsInfo) {
+    modsInfo.setMods(Collections.emptyList());
+    modsInfo.setCurrentIndex(0);
+  }
+
+  /**
+   * Binary search to find the first mod that might contain the given time 
point. Returns the index
+   * of the first mod where modStartTime <= time, or mods.size() if no such 
mod exists.
+   *
+   * @param mods sorted list of mods
+   * @param time time point to search for
+   * @param startIndex starting index for search (current index)
+   * @return index of the first potential mod, or mods.size() if none found
+   */
+  private static int binarySearchMods(List<Deletion> mods, long time, int 
startIndex) {
+    int left = startIndex;
+    int right = mods.size();
+
+    while (left < right) {
+      int mid = left + (right - left) / 2;
+      final long max = mods.get(mid).getTimeRange().getMax();
+
+      if (max < time) {
+        left = mid + 1;
+      } else {
+        right = mid;
+      }
+    }
+
+    return left;
+  }
+
+  /** Mods information wrapper class, containing mods list and current index */
+  public static class ModsInfo {
+    private List<Deletion> mods;
+    private int currentIndex;
+
+    public ModsInfo(List<Deletion> mods, int currentIndex) {
+      this.mods = mods;
+      this.currentIndex = currentIndex;
+    }
+
+    public List<Deletion> getMods() {
+      return mods;
+    }
+
+    public void setMods(List<Deletion> newMods) {
+      this.mods = newMods;
+    }
+
+    public int getCurrentIndex() {
+      return currentIndex;
+    }
+
+    public void setCurrentIndex(int newIndex) {
+      this.currentIndex = newIndex;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      ModsInfo modsInfo = (ModsInfo) o;
+      return Objects.equals(mods, modsInfo.mods)
+          && Objects.equals(currentIndex, modsInfo.currentIndex);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(mods, currentIndex);
+    }
+
+    @Override
+    public String toString() {
+      return "ModsInfo{" + "mods=" + mods + ", currentIndex=" + currentIndex + 
'}';
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index caa4a399f67..c7e796c463e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -101,7 +101,7 @@ public class PipeStatementDataTypeConvertExecutionVisitor
     for (final File file : loadTsFileStatement.getTsFiles()) {
       try (final TsFileInsertionScanDataContainer container =
           new TsFileInsertionScanDataContainer(
-              file, new IoTDBPipePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
+              file, new IoTDBPipePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null, true)) {
         for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
container.toTabletWithIsAligneds()) {
           final PipeConvertedInsertTabletStatement statement =
               new PipeConvertedInsertTabletStatement(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 1ca4338607c..e6f8ffb73f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -92,7 +92,13 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
       for (final File file : loadTsFileStatement.getTsFiles()) {
         try (final TsFileInsertionScanDataContainer container =
             new TsFileInsertionScanDataContainer(
-                file, new IoTDBPipePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
+                file,
+                new IoTDBPipePattern(null),
+                Long.MIN_VALUE,
+                Long.MAX_VALUE,
+                null,
+                null,
+                true)) {
           for (final Pair<Tablet, Boolean> tabletWithIsAligned :
               container.toTabletWithIsAligneds()) {
             final PipeTransferTabletRawReq tabletRawReq =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index f59038982ea..a91a8b97685 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -560,7 +560,7 @@ public class TsFileInsertionDataContainerTest {
         isQuery
             ? new TsFileInsertionQueryDataContainer(tsFile, pattern, 
startTime, endTime)
             : new TsFileInsertionScanDataContainer(
-                tsFile, pattern, startTime, endTime, null, null)) {
+                tsFile, pattern, startTime, endTime, null, null, false)) {
       final AtomicInteger count1 = new AtomicInteger(0);
       final AtomicInteger count2 = new AtomicInteger(0);
       final AtomicInteger count3 = new AtomicInteger(0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
new file mode 100644
index 00000000000..9a9c7ce428a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ModsOperationUtilTest {
+
+  @Test
+  public void testIsDeleteWithBinarySearch() {
+    // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70, 
80]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}, {70, 80}});
+
+    ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 
0);
+
+    // Test cases
+    // Time 5: before first mod, should return false
+    assertFalse(ModsOperationUtil.isDelete(5, modsInfo));
+    assertEquals(0, modsInfo.getCurrentIndex());
+
+    // Time 15: within first mod [10, 20], should return true
+    assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+    assertEquals(0, modsInfo.getCurrentIndex());
+
+    // Time 25: between first and second mod, should return false
+    assertFalse(ModsOperationUtil.isDelete(25, modsInfo));
+    assertEquals(1, modsInfo.getCurrentIndex());
+
+    // Time 35: within second mod [30, 40], should return true
+    assertTrue(ModsOperationUtil.isDelete(35, modsInfo));
+    assertEquals(1, modsInfo.getCurrentIndex());
+
+    // Time 45: between second and third mod, should return false
+    assertFalse(ModsOperationUtil.isDelete(45, modsInfo));
+    assertEquals(2, modsInfo.getCurrentIndex());
+
+    // Time 55: within third mod [50, 60], should return true
+    assertTrue(ModsOperationUtil.isDelete(55, modsInfo));
+    assertEquals(2, modsInfo.getCurrentIndex());
+
+    // Time 65: between third and fourth mod, should return false
+    assertFalse(ModsOperationUtil.isDelete(65, modsInfo));
+    assertEquals(3, modsInfo.getCurrentIndex());
+
+    // Time 75: within fourth mod [70, 80], should return true
+    assertTrue(ModsOperationUtil.isDelete(75, modsInfo));
+    assertEquals(3, modsInfo.getCurrentIndex());
+
+    // Time 85: after last mod, should return false and clear mods
+    assertFalse(ModsOperationUtil.isDelete(85, modsInfo));
+    assertTrue(modsInfo.getMods().isEmpty());
+    assertEquals(0, modsInfo.getCurrentIndex());
+  }
+
+  @Test
+  public void testIsDeleteWithEmptyMods() {
+    ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(new 
ArrayList<>(), 0);
+
+    // Should return false for any time when mods is empty
+    assertFalse(ModsOperationUtil.isDelete(100, modsInfo));
+  }
+
+  @Test
+  public void testIsDeleteWithNullModsInfo() {
+    // Should return false when modsInfo is null
+    assertFalse(ModsOperationUtil.isDelete(100, null));
+  }
+
+  @Test
+  public void testIsDeleteWithNegativeIndex() {
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}});
+    ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 
-1);
+
+    // Should return false when currentIndex is negative
+    assertFalse(ModsOperationUtil.isDelete(15, modsInfo));
+  }
+
+  @Test
+  public void testIsDeleteWithSingleMod() {
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}});
+    ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 
0);
+
+    // Time before mod
+    assertFalse(ModsOperationUtil.isDelete(5, modsInfo));
+    assertEquals(0, modsInfo.getCurrentIndex());
+
+    // Time within mod
+    assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+    assertEquals(0, modsInfo.getCurrentIndex());
+
+    // Time after mod
+    assertFalse(ModsOperationUtil.isDelete(25, modsInfo));
+    assertTrue(modsInfo.getMods().isEmpty());
+    assertEquals(0, modsInfo.getCurrentIndex());
+  }
+
+  @Test
+  public void testIsDeleteWithOverlappingMods() {
+    // Create overlapping mods: [10, 30], [20, 40], [30, 50]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 30}, {20, 40}, 
{30, 50}});
+
+    ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods, 
0);
+
+    // Time 15: within first mod
+    assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+    assertEquals(0, modsInfo.getCurrentIndex());
+
+    // Time 25: within both first and second mod, should find first one
+    assertTrue(ModsOperationUtil.isDelete(25, modsInfo));
+    assertEquals(0, modsInfo.getCurrentIndex());
+
+    // Time 35: within second and third mod, should find second one
+    assertTrue(ModsOperationUtil.isDelete(35, modsInfo));
+    assertEquals(1, modsInfo.getCurrentIndex());
+
+    // Time 45: within third mod
+    assertTrue(ModsOperationUtil.isDelete(45, modsInfo));
+    assertEquals(2, modsInfo.getCurrentIndex());
+  }
+
+  @Test
+  public void testBinarySearchMods() {
+    // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70, 
80]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}, {70, 80}});
+
+    // Test binary search from start index 0
+    // Time 5: before all mods, should return 0 (first mod)
+    assertEquals(0, binarySearchMods(mods, 5, 0));
+
+    // Time 15: within first mod [10, 20], should return 0
+    assertEquals(0, binarySearchMods(mods, 15, 0));
+
+    // Time 25: between first and second mod, should return 1
+    assertEquals(1, binarySearchMods(mods, 25, 0));
+
+    // Time 35: within second mod [30, 40], should return 1
+    assertEquals(1, binarySearchMods(mods, 35, 0));
+
+    // Time 45: between second and third mod, should return 2
+    assertEquals(2, binarySearchMods(mods, 45, 0));
+
+    // Time 55: within third mod [50, 60], should return 2
+    assertEquals(2, binarySearchMods(mods, 55, 0));
+
+    // Time 65: between third and fourth mod, should return 3
+    assertEquals(3, binarySearchMods(mods, 65, 0));
+
+    // Time 75: within fourth mod [70, 80], should return 3
+    assertEquals(3, binarySearchMods(mods, 75, 0));
+
+    // Time 85: after all mods, should return 4 (mods.size())
+    assertEquals(4, binarySearchMods(mods, 85, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithStartIndex() {
+    // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70, 
80]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}, {70, 80}});
+
+    // Test binary search starting from index 2
+    // Time 15: before start index, should return 2 (start index)
+    assertEquals(2, binarySearchMods(mods, 15, 2));
+
+    // Time 35: before start index, should return 2 (start index)
+    assertEquals(2, binarySearchMods(mods, 35, 2));
+
+    // Time 55: within mod at index 2, should return 2
+    assertEquals(2, binarySearchMods(mods, 55, 2));
+
+    // Time 65: between mods at index 2 and 3, should return 3
+    assertEquals(3, binarySearchMods(mods, 65, 2));
+
+    // Time 75: within mod at index 3, should return 3
+    assertEquals(3, binarySearchMods(mods, 75, 2));
+
+    // Time 85: after all mods, should return 4 (mods.size())
+    assertEquals(4, binarySearchMods(mods, 85, 2));
+  }
+
+  @Test
+  public void testBinarySearchModsWithOverlappingRanges() {
+    // Create overlapping mods: [10, 30], [20, 40], [30, 50]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 30}, {20, 40}, 
{30, 50}});
+
+    // Time 15: within first mod, should return 0
+    assertEquals(0, binarySearchMods(mods, 15, 0));
+
+    // Time 25: within first and second mod, should return 0 (first match)
+    assertEquals(0, binarySearchMods(mods, 25, 0));
+
+    // Time 35: within second and third mod, should return 1 (first match from 
start)
+    assertEquals(1, binarySearchMods(mods, 35, 0));
+
+    // Time 45: within third mod, should return 2
+    assertEquals(2, binarySearchMods(mods, 45, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithEmptyList() {
+    List<Deletion> mods = new ArrayList<>();
+
+    // Should return 0 for any time when mods is empty
+    assertEquals(0, binarySearchMods(mods, 100, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithSingleMod() {
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}});
+
+    // Time 5: before mod, should return 0
+    assertEquals(0, binarySearchMods(mods, 5, 0));
+
+    // Time 15: within mod, should return 0
+    assertEquals(0, binarySearchMods(mods, 15, 0));
+
+    // Time 25: after mod, should return 1
+    assertEquals(1, binarySearchMods(mods, 25, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithExactBoundaries() {
+    // Create mods with exact boundaries: [10, 20], [20, 30], [30, 40]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {20, 30}, 
{30, 40}});
+
+    // Time 20: at boundary, first mod [10, 20] has endTime=20 >= 20, should 
return 0
+    assertEquals(0, binarySearchMods(mods, 20, 0));
+
+    // Time 30: at boundary, second mod [20, 30] has endTime=30 >= 30, should 
return 1
+    assertEquals(1, binarySearchMods(mods, 30, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithMinBoundaries() {
+    // Create mods: [10, 20], [30, 40], [50, 60]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}});
+
+    // Time 10: exactly at first mod's min, should return 0
+    assertEquals(0, binarySearchMods(mods, 10, 0));
+
+    // Time 30: exactly at second mod's min, should return 1
+    assertEquals(1, binarySearchMods(mods, 30, 0));
+
+    // Time 50: exactly at third mod's min, should return 2
+    assertEquals(2, binarySearchMods(mods, 50, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithMaxBoundaries() {
+    // Create mods: [10, 20], [30, 40], [50, 60]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}});
+
+    // Time 20: exactly at first mod's max, should return 0
+    assertEquals(0, binarySearchMods(mods, 20, 0));
+
+    // Time 40: exactly at second mod's max, should return 1
+    assertEquals(1, binarySearchMods(mods, 40, 0));
+
+    // Time 60: exactly at third mod's max, should return 2
+    assertEquals(2, binarySearchMods(mods, 60, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithJustBeforeMin() {
+    // Create mods: [10, 20], [30, 40], [50, 60]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}});
+
+    // Time 9: just before first mod's min, should return 0 (first mod)
+    assertEquals(0, binarySearchMods(mods, 9, 0));
+
+    // Time 29: just before second mod's min, should return 1 (second mod)
+    assertEquals(1, binarySearchMods(mods, 29, 0));
+
+    // Time 49: just before third mod's min, should return 2 (third mod)
+    assertEquals(2, binarySearchMods(mods, 49, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithJustAfterMax() {
+    // Create mods: [10, 20], [30, 40], [50, 60]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}});
+
+    // Time 21: just after first mod's max, should return 1 (second mod)
+    assertEquals(1, binarySearchMods(mods, 21, 0));
+
+    // Time 41: just after second mod's max, should return 2 (third mod)
+    assertEquals(2, binarySearchMods(mods, 41, 0));
+
+    // Time 61: just after third mod's max, should return 3 (mods.size())
+    assertEquals(3, binarySearchMods(mods, 61, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithLargeGaps() {
+    // Create mods with large gaps: [10, 20], [100, 200], [1000, 2000]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {100, 200}, 
{1000, 2000}});
+
+    // Time 50: in large gap, should return 1 (second mod)
+    assertEquals(1, binarySearchMods(mods, 50, 0));
+
+    // Time 500: in large gap, should return 2 (third mod)
+    assertEquals(2, binarySearchMods(mods, 500, 0));
+
+    // Time 5000: after all mods, should return 3 (mods.size())
+    assertEquals(3, binarySearchMods(mods, 5000, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithNegativeTime() {
+    // Create mods: [10, 20], [30, 40]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}});
+
+    // Time -10: negative time, should return 0 (first mod)
+    assertEquals(0, binarySearchMods(mods, -10, 0));
+
+    // Time 0: zero time, should return 0 (first mod)
+    assertEquals(0, binarySearchMods(mods, 0, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithVeryLargeTime() {
+    // Create mods: [10, 20], [30, 40]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}});
+
+    // Time Long.MAX_VALUE: very large time, should return 2 (mods.size())
+    assertEquals(2, binarySearchMods(mods, Long.MAX_VALUE, 0));
+
+    // Time Long.MIN_VALUE: very small time, should return 0 (first mod)
+    assertEquals(0, binarySearchMods(mods, Long.MIN_VALUE, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithDuplicateTimeRanges() {
+    // Create mods with duplicate time ranges: [10, 20], [10, 20], [30, 40]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {10, 20}, 
{30, 40}});
+
+    // Time 15: within duplicate ranges, should return 0 (first match)
+    assertEquals(0, binarySearchMods(mods, 15, 0));
+
+    // Time 20: at max of duplicate ranges, should return 0 (first match)
+    assertEquals(0, binarySearchMods(mods, 20, 0));
+  }
+
+  @Test
+  public void testBinarySearchModsWithStartIndexAtEnd() {
+    // Create mods: [10, 20], [30, 40], [50, 60]
+    List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}, 
{50, 60}});
+
+    // Start from index 3 (beyond array), should return 3
+    assertEquals(3, binarySearchMods(mods, 100, 3));
+
+    // Start from index 2, search for time 55, should return 2
+    assertEquals(2, binarySearchMods(mods, 55, 2));
+
+    // Start from index 2, search for time 25, should return 2 (start index)
+    assertEquals(2, binarySearchMods(mods, 25, 2));
+  }
+
+  // Helper method to access the private binarySearchMods method for testing
+  private int binarySearchMods(List<Deletion> mods, long time, int startIndex) 
{
+    // Use reflection to access the private method
+    try {
+      java.lang.reflect.Method method =
+          ModsOperationUtil.class.getDeclaredMethod(
+              "binarySearchMods", List.class, long.class, int.class);
+      method.setAccessible(true);
+      return (Integer) method.invoke(null, mods, time, startIndex);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to invoke binarySearchMods method", 
e);
+    }
+  }
+
+  private List<Deletion> createTestMods(long[][] timeRanges) {
+    List<Deletion> mods = new ArrayList<>();
+    for (long[] range : timeRanges) {
+      Deletion mod = new Deletion(null, 0, range[0], range[1]);
+      mods.add(mod);
+    }
+    return mods;
+  }
+}

Reply via email to