This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c3e7e6f91f1 [To dev/1.3] Pipe: Add TsFile parsing with Mods function
(#16540) (#16651)
c3e7e6f91f1 is described below
commit c3e7e6f91f1ae6703af7f806bb35226ef0bd9af6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Oct 28 14:09:07 2025 +0800
[To dev/1.3] Pipe: Add TsFile parsing with Mods function (#16540) (#16651)
---
.../it/env/cluster/config/MppConfigNodeConfig.java | 6 +
.../env/remote/config/RemoteConfigNodeConfig.java | 5 +
.../apache/iotdb/itbase/env/ConfigNodeConfig.java | 2 +
.../IoTDBPipeTsFileDecompositionWithModsIT.java | 660 +++++++++++++++++++++
.../common/tsfile/PipeTsFileInsertionEvent.java | 2 +-
.../container/TsFileInsertionDataContainer.java | 16 +
.../TsFileInsertionDataContainerProvider.java | 45 +-
.../query/TsFileInsertionQueryDataContainer.java | 79 ++-
.../TsFileInsertionQueryDataTabletIterator.java | 33 +-
.../scan/TsFileInsertionScanDataContainer.java | 377 ++++++++----
.../tsfile/parser/util/ModsOperationUtil.java | 315 ++++++++++
...peStatementDataTypeConvertExecutionVisitor.java | 2 +-
...eeStatementDataTypeConvertExecutionVisitor.java | 8 +-
.../event/TsFileInsertionDataContainerTest.java | 2 +-
.../tsfile/parser/util/ModsOperationUtilTest.java | 406 +++++++++++++
15 files changed, 1817 insertions(+), 141 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
index 8e4a6def365..4385097ce04 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppConfigNodeConfig.java
@@ -67,4 +67,10 @@ public class MppConfigNodeConfig extends MppBaseConfig
implements ConfigNodeConf
properties.setProperty("metric_prometheus_reporter_password", password);
return this;
}
+
+ @Override
+ public ConfigNodeConfig setLeaderDistributionPolicy(String policy) {
+ properties.setProperty("leader_distribution_policy", policy);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
index ae8645eff52..c16722f4bd9 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteConfigNodeConfig.java
@@ -38,4 +38,9 @@ public class RemoteConfigNodeConfig implements
ConfigNodeConfig {
public ConfigNodeConfig setMetricPrometheusReporterPassword(String password)
{
return this;
}
+
+ @Override
+ public ConfigNodeConfig setLeaderDistributionPolicy(String policy) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
index 65a5a3271fc..4af35f6f56a 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/ConfigNodeConfig.java
@@ -29,4 +29,6 @@ public interface ConfigNodeConfig {
ConfigNodeConfig setMetricPrometheusReporterUsername(String username);
ConfigNodeConfig setMetricPrometheusReporterPassword(String password);
+
+ ConfigNodeConfig setLeaderDistributionPolicy(String policy);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
new file mode 100644
index 00000000000..48c7e83240c
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualManualIT {
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+ senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+ receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+ }
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned
timeseries), d2
+ * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned
timeseries) 2. Insert initial
+ * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile
4. Execute DELETE
+ * operation on d1.s1, deleting data in time range 2-4 5. Insert large
amount of data into d4
+ * (11000 records, inserted in batches) 6. Execute multiple DELETE
operations on d4: - Delete s1
+ * field data where time<=10000 - Delete s2 field data where time>1000 -
Delete s3 field data
+ * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH
operation again 9. Create
+ * pipe with mods enabled, synchronize data to receiver 10. Verify
correctness of receiver data: -
+ * d1 s1 field is null in time range 2-4, other data is normal - d2 and d3
data is completely
+ * deleted - d4 DELETE operation results meet expectations for each field
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods
(modification operations) in
+ * TsFile under tree model, ensuring various DELETE operations can be
correctly synchronized to
+ * the receiver and data consistency is guaranteed.
+ */
+ @Test
+ public void testTsFileDecompositionWithMods() throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0,
2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4,
2.4, 3.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2,
10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES ";
+ StringBuilder insertBuilder = new StringBuilder(s);
+ for (int i = 1; i <= 11000; i++) {
+ insertBuilder
+ .append("(")
+ .append(i)
+ .append(",")
+ .append(1.0f)
+ .append(",")
+ .append(2.0f)
+ .append(",")
+ .append(3.0f)
+ .append(")");
+ if (i % 100 != 0) {
+ insertBuilder.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder.toString());
+ insertBuilder = new StringBuilder(s);
+ }
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1
WHERE time <= 10000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2
WHERE time > 1000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3
WHERE time <= 8000");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipe to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ HashSet<String> results = new HashSet<>();
+ results.add("1,3.0,1.0,2.0,");
+ results.add("2,3.1,null,2.1,");
+ results.add("3,3.2,null,2.2,");
+ results.add("4,3.3,null,2.3,");
+ results.add("5,3.4,1.4,2.4,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4",
+ "Time,root.sg1.d1.s1,",
+ Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d4",
+ "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),",
+ Collections.singleton("3000,1000,1000,"));
+ }
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ * - Multi-pipe scenario
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned
timeseries), d2
+ * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned
timeseries) 2. Insert initial
+ * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile
4. Execute DELETE
+ * operation on d1.s1, deleting data in time range 2-4 5. Insert large
amount of data into d4
+ * (11000 records, inserted in batches) 6. Execute multiple DELETE
operations on d4: - Delete s1
+ * field data where time<=10000 - Delete s2 field data where time>1000 -
Delete s3 field data
+ * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH
operation again 9. Create 4
+ * independent pipes, each targeting different device paths: - test_pipe1:
handles data for
+ * root.sg1.d1.** path - test_pipe2: handles data for root.sg1.d2.** path -
test_pipe3: handles
+ * data for root.sg1.d3.** path - test_pipe4: handles data for
root.sg1.d4.** path 10. Verify
+ * correctness of receiver data: - d1 s1 field is null in time range 2-4,
other data is normal -
+ * d2 and d3 data is completely deleted - d4 DELETE operation results meet
expectations for each
+ * field
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods
(modification operations) in
+ * TsFile under tree model through multiple independent pipes, ensuring
DELETE operations for
+ * different paths can be correctly synchronized to the receiver and data
consistency is
+ * guaranteed. The main difference from the first test method is using
multiple pipes to handle
+ * data for different devices separately.
+ */
+ @Test
+ public void testTsFileDecompositionWithMods2() throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0,
2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4,
2.4, 3.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2,
10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES ";
+ StringBuilder insertBuilder = new StringBuilder(s);
+ for (int i = 1; i <= 11000; i++) {
+ insertBuilder
+ .append("(")
+ .append(i)
+ .append(",")
+ .append(1.0f)
+ .append(",")
+ .append(2.0f)
+ .append(",")
+ .append(3.0f)
+ .append(")");
+ if (i % 100 != 0) {
+ insertBuilder.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder.toString());
+ insertBuilder = new StringBuilder(s);
+ }
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1
WHERE time <= 10000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2
WHERE time > 1000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3
WHERE time <= 8000");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipes to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ HashSet<String> results = new HashSet<>();
+ results.add("1,3.0,1.0,2.0,");
+ results.add("2,3.1,null,2.1,");
+ results.add("3,3.2,null,2.2,");
+ results.add("4,3.3,null,2.3,");
+ results.add("5,3.4,1.4,2.4,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe1 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d1.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe2 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d2.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe3 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d3.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe4 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d4.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4",
+ "Time,root.sg1.d1.s1,",
+ Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d4",
+ "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),",
+ Collections.singleton("3000,1000,1000,"));
+ }
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ * - Large scale single point deletion scenario
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 1 device: d1 (aligned
timeseries with 5
+ * sensors) 2. Insert 20000 data points for each sensor with different time
ranges: - s1: time
+ * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time
30001-50000 - s5: time
+ * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4.
Execute 2000 single point
+ * DELETE operations, each deleting one time point from different sensors 5.
Execute FLUSH
+ * operation again 6. Create pipe with mods enabled 7. Verify correctness of
receiver data: - Each
+ * sensor should have 19800 remaining data points - Deleted points should
not appear in receiver
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle large scale
single point deletion
+ * operations in TsFile under tree model, ensuring the binary search
optimization in
+ * ModsOperationUtil works correctly with many modification entries.
+ */
+ @Test
+ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion()
throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ // Insert 20000 data points for s1 (time 1-20000)
+ String s1 = "INSERT INTO root.sg1.d1(time, s1) ALIGNED VALUES ";
+ StringBuilder insertBuilder1 = new StringBuilder(s1);
+ for (int i = 1; i <= 20000; i++) {
+
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder1.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder1.toString());
+ insertBuilder1 = new StringBuilder(s1);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder1.length() > s1.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
+ }
+
+ // Insert 20000 data points for s2 (time 10001-30000)
+ String s2 = "INSERT INTO root.sg1.d1(time, s2) ALIGNED VALUES ";
+ StringBuilder insertBuilder2 = new StringBuilder(s2);
+ for (int i = 10001; i <= 30000; i++) {
+
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder2.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder2.toString());
+ insertBuilder2 = new StringBuilder(s2);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder2.length() > s2.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
+ }
+
+ // Insert 20000 data points for s3 (time 20001-40000)
+ String s3 = "INSERT INTO root.sg1.d1(time, s3) ALIGNED VALUES ";
+ StringBuilder insertBuilder3 = new StringBuilder(s3);
+ for (int i = 20001; i <= 40000; i++) {
+
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder3.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder3.toString());
+ insertBuilder3 = new StringBuilder(s3);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder3.length() > s3.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
+ }
+
+ // Insert 20000 data points for s4 (time 30001-50000)
+ String s4 = "INSERT INTO root.sg1.d1(time, s4) ALIGNED VALUES ";
+ StringBuilder insertBuilder4 = new StringBuilder(s4);
+ for (int i = 30001; i <= 50000; i++) {
+
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder4.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder4.toString());
+ insertBuilder4 = new StringBuilder(s4);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder4.length() > s4.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
+ }
+
+ // Insert 20000 data points for s5 (time 40001-60000)
+ String s5 = "INSERT INTO root.sg1.d1(time, s5) ALIGNED VALUES ";
+ StringBuilder insertBuilder5 = new StringBuilder(s5);
+ for (int i = 40001; i <= 60000; i++) {
+
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder5.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder5.toString());
+ insertBuilder5 = new StringBuilder(s5);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder5.length() > s5.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Execute 2000 single point DELETE operations
+ // Delete 400 points from each sensor (distributed across different time
ranges)
+ for (int i = 0; i < 400; i++) {
+ // Delete from s1: every 10th point starting from 10
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i *
10));
+
+ // Delete from s2: every 10th point starting from 10010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i *
10));
+
+ // Delete from s3: every 10th point starting from 20010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i *
10));
+
+ // Delete from s4: every 10th point starting from 30010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i *
10));
+
+ // Delete from s5: every 10th point starting from 40010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i *
10));
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipe to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT COUNT(**) FROM root.sg1.d1",
+
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,19600,19600,19600,19600,"));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ // Verify total count of all sensors using COUNT(*)
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d1",
+
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,19600,19600,19600,19600,"));
+
+ // Verify individual sensor counts
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("19600,"));
+
+ // Verify count of deleted time ranges using COUNT with WHERE clause
+ // These should return 0 since all points in these ranges were deleted
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000
AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <=
14000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <=
24000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <=
34000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <=
44000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("0,"));
+
+ // Verify count of non-deleted time ranges using multiple range queries
+ // Check ranges before deletion area
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time <
10010",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time <
20010",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time <
30010",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time <
40010",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("9,"));
+
+ // Check ranges after deletion area
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <=
20000",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <=
30000",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <=
40000",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <=
50000",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <=
60000",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("16000,"));
+
+ // Check non-deleted points within deletion range (every 10th point except
the ones we deleted)
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000
AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <=
14000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <=
24000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <=
34000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <=
44000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("3591,"));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 1e63a2d4af3..858c425c3e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -553,7 +553,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
endTime,
pipeTaskMeta,
this)
- .provide());
+ .provide(isWithMod));
return dataContainer.get();
} catch (final IOException e) {
close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 77ef2c871cb..9f779528f52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -26,6 +27,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -49,6 +52,10 @@ public abstract class TsFileInsertionDataContainer
implements AutoCloseable {
protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final EnrichedEvent sourceEvent; // used to report progress
+ // mods entry
+ protected PipeMemoryBlock allocatedMemoryBlockForModifications;
+ protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
currentModifications;
+
protected final long initialTimeNano = System.nanoTime();
protected boolean timeUsageReported = false;
@@ -117,5 +124,14 @@ public abstract class TsFileInsertionDataContainer
implements AutoCloseable {
if (allocatedMemoryBlockForTablet != null) {
allocatedMemoryBlockForTablet.close();
}
+
+ if (currentModifications != null) {
+ // help GC
+ currentModifications = null;
+ }
+
+ if (allocatedMemoryBlockForModifications != null) {
+ allocatedMemoryBlockForModifications.close();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index e8b29c72c40..b6c24365f3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -72,7 +72,7 @@ public class TsFileInsertionDataContainerProvider {
this.sourceEvent = sourceEvent;
}
- public TsFileInsertionDataContainer provide() throws IOException {
+ public TsFileInsertionDataContainer provide(final boolean isWithMod) throws
IOException {
if (pipeName != null) {
PipeTsFileToTabletsMetrics.getInstance()
.markTsFileToTabletInvocation(pipeName + "_" + creationTime);
@@ -83,7 +83,15 @@ public class TsFileInsertionDataContainerProvider {
/ PipeMemoryManager.getTotalNonFloatingMemorySizeInBytes()
> PipeTsFilePublicResource.MEMORY_SUFFICIENT_THRESHOLD) {
return new TsFileInsertionScanDataContainer(
- pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
+ pipeName,
+ creationTime,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ isWithMod);
}
if (pattern instanceof UnionIoTDBPipePattern
@@ -95,7 +103,15 @@ public class TsFileInsertionDataContainerProvider {
// hard to know whether it only matches one timeseries, while matching
multiple is often the
// case.
return new TsFileInsertionQueryDataContainer(
- pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
+ pipeName,
+ creationTime,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ isWithMod);
}
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -104,7 +120,15 @@ public class TsFileInsertionDataContainerProvider {
// If we failed to get from cache, it indicates that the memory usage is
high.
// We use scan data container because it requires less memory.
return new TsFileInsertionScanDataContainer(
- pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
+ pipeName,
+ creationTime,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ isWithMod);
}
final int originalSize = deviceIsAlignedMap.size();
@@ -114,7 +138,15 @@ public class TsFileInsertionDataContainerProvider {
return (double) filteredDeviceIsAlignedMap.size() / originalSize
> PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
? new TsFileInsertionScanDataContainer(
- pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent)
+ pipeName,
+ creationTime,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ isWithMod)
: new TsFileInsertionQueryDataContainer(
pipeName,
creationTime,
@@ -124,7 +156,8 @@ public class TsFileInsertionDataContainerProvider {
endTime,
pipeTaskMeta,
sourceEvent,
- filteredDeviceIsAlignedMap);
+ filteredDeviceIsAlignedMap,
+ isWithMod);
}
private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 897d820df90..4a0c2be0bf0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -26,16 +26,19 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -72,7 +75,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
public TsFileInsertionQueryDataContainer(
final File tsFile, final PipePattern pattern, final long startTime,
final long endTime)
throws IOException {
- this(null, 0, tsFile, pattern, startTime, endTime, null, null);
+ this(null, 0, tsFile, pattern, startTime, endTime, null, null, false);
}
public TsFileInsertionQueryDataContainer(
@@ -83,7 +86,8 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
- final EnrichedEvent sourceEvent)
+ final EnrichedEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
this(
pipeName,
@@ -94,7 +98,8 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
endTime,
pipeTaskMeta,
sourceEvent,
- null);
+ null,
+ isWithMod);
}
public TsFileInsertionQueryDataContainer(
@@ -106,11 +111,20 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent,
- final Map<IDeviceID, Boolean> deviceIsAlignedMap)
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap,
+ final boolean isWithMod)
throws IOException {
super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
try {
+ currentModifications =
+ isWithMod
+ ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+ : PatternTreeMapFactory.getModsPatternTreeMap();
+ allocatedMemoryBlockForModifications =
+ PipeDataNodeResourceManager.memory()
+
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
+
final PipeTsFileResourceManager tsFileResourceManager =
PipeDataNodeResourceManager.tsfile();
final Map<IDeviceID, List<String>> deviceMeasurementsMap;
@@ -155,6 +169,60 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+ final Iterator<Map.Entry<IDeviceID, List<String>>> iterator =
+ deviceMeasurementsMap.entrySet().iterator();
+ while (isWithMod && iterator.hasNext()) {
+ final Map.Entry<IDeviceID, List<String>> entry = iterator.next();
+ final IDeviceID deviceId = entry.getKey();
+ final List<String> measurements = entry.getValue();
+
+ // Check if deviceId is deleted
+ if (deviceId == null) {
+ LOGGER.warn("Found null deviceId, removing entry");
+ iterator.remove();
+ continue;
+ }
+
+ // Check if measurements list is deleted or empty
+ if (measurements == null || measurements.isEmpty()) {
+ iterator.remove();
+ continue;
+ }
+
+ if (!currentModifications.isEmpty()) {
+ // Safely filter measurements, remove non-existent measurements
+ measurements.removeIf(
+ measurement -> {
+ if (measurement == null) {
+ return true;
+ }
+
+ try {
+ TimeseriesMetadata meta =
+ tsFileSequenceReader.readTimeseriesMetadata(deviceId,
measurement, true);
+ return ModsOperationUtil.isAllDeletedByMods(
+ deviceId.toString(),
+ measurement,
+ meta.getStatistics().getStartTime(),
+ meta.getStatistics().getEndTime(),
+ currentModifications);
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Failed to read metadata for deviceId: {}, measurement:
{}, removing",
+ deviceId,
+ measurement,
+ e);
+ return true;
+ }
+ });
+ }
+
+ // If measurements list is empty after filtering, remove the entire
entry
+ if (measurements.isEmpty()) {
+ iterator.remove();
+ }
+ }
+
// Filter again to get the final deviceMeasurementsMap that exactly
matches the pattern.
deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
@@ -302,7 +370,8 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
((PlainDeviceID) entry.getKey()).toStringID(),
entry.getValue(),
timeFilterExpression,
- allocatedMemoryBlockForTablet);
+ allocatedMemoryBlockForTablet,
+ currentModifications);
} catch (final Exception e) {
close();
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index 5fa252412d4..e16c7113da3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.common.constant.TsFileConstant;
@@ -60,6 +64,9 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
private final PipeMemoryBlock allocatedBlockForTablet;
+ // Maintain sorted mods list and current index for each measurement
+ private final List<ModsOperationUtil.ModsInfo> measurementModsList;
+
private RowRecord rowRecord;
TsFileInsertionQueryDataTabletIterator(
@@ -68,7 +75,8 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
final String deviceId,
final List<String> measurements,
final IExpression timeFilterExpression,
- final PipeMemoryBlock allocatedBlockForTablet)
+ final PipeMemoryBlock allocatedBlockForTablet,
+ final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
currentModifications)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
@@ -88,6 +96,10 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
this.queryDataSet = buildQueryDataSet();
this.allocatedBlockForTablet =
Objects.requireNonNull(allocatedBlockForTablet);
+
+ this.measurementModsList =
+ ModsOperationUtil.initializeMeasurementMods(
+ deviceId, this.measurements, currentModifications);
}
private QueryDataSet buildQueryDataSet() throws IOException {
@@ -155,16 +167,23 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
final int rowIndex = tablet.rowSize;
- tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
-
+ boolean isNeedFillTime = false;
final List<Field> fields = rowRecord.getFields();
final int fieldSize = fields.size();
for (int i = 0; i < fieldSize; i++) {
final Field field = fields.get(i);
- tablet.addValue(
- measurements.get(i),
- rowIndex,
- field == null ? null :
field.getObjectValue(schemas.get(i).getType()));
+ final String measurement = measurements.get(i);
+ // Check if this value is deleted by mods
+ if (field == null
+ || ModsOperationUtil.isDelete(rowRecord.getTimestamp(),
measurementModsList.get(i))) {
+ tablet.bitMaps[i].mark(rowIndex);
+ } else {
+ tablet.addValue(measurement, rowIndex,
field.getObjectValue(schemas.get(i).getType()));
+ isNeedFillTime = true;
+ }
+ }
+ if (isNeedFillTime) {
+ tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
}
tablet.rowSize++;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index c5ef8423297..da2a94fab18 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -19,15 +19,19 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -36,7 +40,10 @@ import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Chunk;
@@ -51,11 +58,14 @@ import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -65,6 +75,8 @@ import java.util.Objects;
public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ModsOperationUtil.class);
+
private static final LocalDate EMPTY_DATE = LocalDate.of(1000, 1, 1);
private final long startTime;
@@ -81,6 +93,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private boolean currentIsAligned;
private final List<MeasurementSchema> currentMeasurements = new
ArrayList<>();
+ private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
private final List<Boolean> isMultiPageList = new ArrayList<>();
@@ -99,7 +112,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
- final EnrichedEvent sourceEvent)
+ final EnrichedEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
@@ -116,7 +130,19 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
try {
- tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
+ currentModifications =
+ isWithMod
+ ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+ : PatternTreeMapFactory.getModsPatternTreeMap();
+ allocatedMemoryBlockForModifications =
+ PipeDataNodeResourceManager.memory()
+
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
+
+ tsFileSequenceReader =
+ new TsFileSequenceReader(
+ tsFile.getAbsolutePath(),
+ !currentModifications.isEmpty(),
+ !currentModifications.isEmpty());
tsFileSequenceReader.position((long)
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
prepareData();
@@ -132,9 +158,10 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
- final EnrichedEvent sourceEvent)
+ final EnrichedEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
- this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
+ this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent, isWithMod);
}
@Override
@@ -251,8 +278,9 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final int rowIndex = tablet.rowSize;
- tablet.addTimestamp(rowIndex, data.currentTime());
- putValueToColumns(data, tablet, rowIndex);
+ if (putValueToColumns(data, tablet, rowIndex)) {
+ tablet.addTimestamp(rowIndex, data.currentTime());
+ }
tablet.rowSize++;
}
@@ -304,13 +332,14 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
} while (!data.hasCurrent());
}
- private void putValueToColumns(final BatchData data, final Tablet tablet,
final int rowIndex) {
+ private boolean putValueToColumns(final BatchData data, final Tablet tablet,
final int rowIndex) {
final Object[] columns = tablet.values;
-
+ boolean isNeedFillTime = false;
if (data.getDataType() == TSDataType.VECTOR) {
for (int i = 0; i < columns.length; ++i) {
final TsPrimitiveType primitiveType = data.getVector()[i];
- if (Objects.isNull(primitiveType)) {
+ if (Objects.isNull(primitiveType)
+ || ModsOperationUtil.isDelete(data.currentTime(),
modsInfos.get(i))) {
tablet.bitMaps[i].mark(rowIndex);
final TSDataType type = tablet.getSchemas().get(i).getType();
if (type == TSDataType.TEXT || type == TSDataType.BLOB || type ==
TSDataType.STRING) {
@@ -321,6 +350,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
continue;
}
+
+ isNeedFillTime = true;
switch (tablet.getSchemas().get(i).getType()) {
case BOOLEAN:
((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
@@ -352,6 +383,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
}
} else {
+ isNeedFillTime = true;
switch (tablet.getSchemas().get(0).getType()) {
case BOOLEAN:
((boolean[]) columns[0])[rowIndex] = data.getBoolean();
@@ -381,6 +413,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
throw new UnSupportedDataTypeException("UnSupported" +
data.getDataType());
}
}
+ return isNeedFillTime;
}
private void moveToNextChunkReader() throws IOException,
IllegalStateException {
@@ -388,6 +421,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
long valueChunkSize = 0;
final List<Chunk> valueChunkList = new ArrayList<>();
currentMeasurements.clear();
+ modsInfos.clear();
if (lastMarker == MetaMarker.SEPARATOR) {
chunkReader = null;
@@ -403,134 +437,211 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
- // Notice that the data in one chunk group is either aligned or
non-aligned
- // There is no need to consider non-aligned chunks when there are
value chunks
- currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
+ {
+ // Notice that the data in one chunk group is either aligned or
non-aligned
+ // There is no need to consider non-aligned chunks when there are
value chunks
+ currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
+ long currentChunkHeaderOffset = tsFileSequenceReader.position() -
1;
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+ final long nextMarkerOffset =
+ tsFileSequenceReader.position() + chunkHeader.getDataSize();
- chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+ if (Objects.isNull(currentDevice)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
- if (Objects.isNull(currentDevice)) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK) {
+ timeChunkList.add(
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
+ isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
+ break;
+ }
- if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
- == TsFileConstant.TIME_COLUMN_MASK) {
- timeChunkList.add(
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
- break;
- }
+ if (!pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
- if (!pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ // Skip the chunk if it is fully deleted by mods
+ if (!currentModifications.isEmpty()) {
+ Statistics statistics = null;
+ try {
+ statistics =
+ findNonAlignedChunkStatistics(
+ tsFileSequenceReader.getIChunkMetadataList(
+ CompactionPathUtils.getPath(
+ currentDevice,
chunkHeader.getMeasurementID())),
+ currentChunkHeaderOffset);
+ } catch (IllegalPathException ignore) {
+ LOGGER.warn(
+ "Failed to get chunk metadata for {}.{}",
+ currentDevice,
+ chunkHeader.getMeasurementID());
+ }
- if (chunkHeader.getDataSize() >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunk,
chunkHeader.getDataSize());
- }
+ if (statistics != null
+ && ModsOperationUtil.isAllDeletedByMods(
+ currentDevice,
+ chunkHeader.getMeasurementID(),
+ statistics.getStartTime(),
+ statistics.getEndTime(),
+ currentModifications)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
+ }
- chunkReader =
- currentIsMultiPage
- ? new ChunkReader(
- new Chunk(
- chunkHeader,
- tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())),
- filter)
- : new SinglePageWholeChunkReader(
- new Chunk(
- chunkHeader,
- tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- currentIsAligned = false;
- currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
- return;
+ if (chunkHeader.getDataSize() >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunk,
chunkHeader.getDataSize());
+ }
+
+ Chunk chunk =
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+
+ chunkReader =
+ currentIsMultiPage
+ ? new ChunkReader(chunk, filter)
+ : new SinglePageWholeChunkReader(chunk);
+ currentIsAligned = false;
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ modsInfos.addAll(
+ ModsOperationUtil.initializeMeasurementMods(
+ currentDevice,
+ Collections.singletonList(chunkHeader.getMeasurementID()),
+ currentModifications));
+ return;
+ }
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
- if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
- chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
-
- if (Objects.isNull(currentDevice)
- || !pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ {
+ if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
+ long currentChunkHeaderOffset = tsFileSequenceReader.position()
- 1;
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+ final long nextMarkerOffset =
+ tsFileSequenceReader.position() + chunkHeader.getDataSize();
+ if (Objects.isNull(currentDevice)
+ || !pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
- // Increase value index
- final int valueIndex =
- measurementIndexMap.compute(
- chunkHeader.getMeasurementID(),
- (measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
+ if (!currentModifications.isEmpty()) {
+ // Skip the chunk if it is fully deleted by mods
+ Statistics statistics = null;
+ try {
+ statistics =
+ findAlignedChunkStatistics(
+ tsFileSequenceReader.getIChunkMetadataList(
+ CompactionPathUtils.getPath(
+ currentDevice,
chunkHeader.getMeasurementID())),
+ currentChunkHeaderOffset);
+ } catch (IllegalPathException ignore) {
+ LOGGER.warn(
+ "Failed to get chunk metadata for {}.{}",
+ currentDevice,
+ chunkHeader.getMeasurementID());
+ }
+ if (statistics != null
+ && ModsOperationUtil.isAllDeletedByMods(
+ currentDevice,
+ chunkHeader.getMeasurementID(),
+ statistics.getStartTime(),
+ statistics.getEndTime(),
+ currentModifications)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
+ }
- // Emit when encountered non-sequential value chunk, or the chunk
size exceeds
- // certain value to avoid OOM
- // Do not record or end current value chunks when there are empty
chunks
- if (chunkHeader.getDataSize() == 0) {
- break;
- }
- boolean needReturn = false;
- final long timeChunkSize =
- lastIndex >= 0
- ?
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
- : 0;
- if (lastIndex >= 0) {
- if (valueIndex != lastIndex) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
- } else {
- final long chunkSize = timeChunkSize + valueChunkSize;
- if (chunkSize + chunkHeader.getDataSize()
- > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- if (valueChunkList.size() == 1
- && chunkSize >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunk, chunkSize);
- }
+ // Increase value index
+ final int valueIndex =
+ measurementIndexMap.compute(
+ chunkHeader.getMeasurementID(),
+ (measurement, index) -> Objects.nonNull(index) ? index +
1 : 0);
+
+ // Emit when encountered non-sequential value chunk, or the
chunk size exceeds
+ // certain value to avoid OOM
+ // Do not record or end current value chunks when there are
empty chunks
+ if (chunkHeader.getDataSize() == 0) {
+ break;
+ }
+ boolean needReturn = false;
+ final long timeChunkSize =
+ lastIndex >= 0
+ ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+ timeChunkList.get(lastIndex))
+ : 0;
+ if (lastIndex >= 0) {
+ if (valueIndex != lastIndex) {
needReturn = recordAlignedChunk(valueChunkList, marker);
+ } else {
+ final long chunkSize = timeChunkSize + valueChunkSize;
+ if (chunkSize + chunkHeader.getDataSize()
+ > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ if (valueChunkList.size() == 1
+ && chunkSize >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunk,
chunkSize);
+ }
+ needReturn = recordAlignedChunk(valueChunkList, marker);
+ }
}
}
+ lastIndex = valueIndex;
+ if (needReturn) {
+ firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+ return;
+ }
+ } else {
+ chunkHeader = firstChunkHeader4NextSequentialValueChunks;
+ firstChunkHeader4NextSequentialValueChunks = null;
}
- lastIndex = valueIndex;
- if (needReturn) {
- firstChunkHeader4NextSequentialValueChunks = chunkHeader;
- return;
- }
- } else {
- chunkHeader = firstChunkHeader4NextSequentialValueChunks;
- firstChunkHeader4NextSequentialValueChunks = null;
- }
- valueChunkSize += chunkHeader.getDataSize();
- valueChunkList.add(
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
- break;
+ Chunk chunk =
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+
+ valueChunkSize += chunkHeader.getDataSize();
+ valueChunkList.add(chunk);
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ modsInfos.addAll(
+ ModsOperationUtil.initializeMeasurementMods(
+ currentDevice,
+ Collections.singletonList(chunkHeader.getMeasurementID()),
+ currentModifications));
+ break;
+ }
case MetaMarker.CHUNK_GROUP_HEADER:
- // Return before "currentDevice" changes
- if (recordAlignedChunk(valueChunkList, marker)) {
- return;
+ {
+ // Return before "currentDevice" changes
+ if (recordAlignedChunk(valueChunkList, marker)) {
+ return;
+ }
+ final String deviceID =
+ ((PlainDeviceID)
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
+ .toStringID();
+ // Clear because the cached data will never be used in the next
chunk group
+ lastIndex = -1;
+ timeChunkList.clear();
+ isMultiPageList.clear();
+ measurementIndexMap.clear();
+
+ currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID
: null;
+ break;
}
- final String deviceID =
- ((PlainDeviceID)
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
- .toStringID();
- // Clear because the cached data will never be used in the next
chunk group
- lastIndex = -1;
- timeChunkList.clear();
- isMultiPageList.clear();
- measurementIndexMap.clear();
-
- currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID :
null;
- break;
case MetaMarker.OPERATION_INDEX_RANGE:
- tsFileSequenceReader.readPlanIndex();
- break;
+ {
+ tsFileSequenceReader.readPlanIndex();
+ break;
+ }
default:
MetaMarker.handleUnexpectedMarker(marker);
}
@@ -571,4 +682,32 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
allocatedMemoryBlockForChunk.close();
}
}
+
+ private Statistics findAlignedChunkStatistics(
+ List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) {
+ for (IChunkMetadata metadata : metadataList) {
+ if (!(metadata instanceof AlignedChunkMetadata)) {
+ continue;
+ }
+ List<IChunkMetadata> list = ((AlignedChunkMetadata)
metadata).getValueChunkMetadataList();
+ for (IChunkMetadata m : list) {
+ if (m.getOffsetOfChunkHeader() == currentChunkHeaderOffset) {
+ return m.getStatistics();
+ }
+ }
+ break;
+ }
+ return null;
+ }
+
+ private Statistics findNonAlignedChunkStatistics(
+ List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) {
+ for (IChunkMetadata metadata : metadataList) {
+ if (metadata.getOffsetOfChunkHeader() == currentChunkHeaderOffset) {
+ // found the corresponding chunk metadata
+ return metadata.getStatistics();
+ }
+ }
+ return null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
new file mode 100644
index 00000000000..19554c32c06
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utility class for handling mods operations during TsFile parsing. Supports
mods processing logic
+ * for both tree model and table model.
+ */
+public class ModsOperationUtil {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ModsOperationUtil.class);
+
+ private ModsOperationUtil() {
+ // Utility class, no instantiation allowed
+ }
+
+ /**
+ * Load all modifications from TsFile and build PatternTreeMap
+ *
+ * @param tsFile TsFile file
+ * @return PatternTreeMap containing all modifications
+ */
+ public static PatternTreeMap<Modification,
PatternTreeMapFactory.ModsSerializer>
+ loadModificationsFromTsFile(File tsFile) {
+ PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+
+ try (ModificationFile file =
+ new ModificationFile(tsFile.getPath() + ModificationFile.FILE_SUFFIX))
{
+ file.getModifications()
+ .forEach(modification ->
modifications.append(modification.getPath(), modification));
+ } catch (Exception e) {
+ throw new PipeException("Failed to load modifications from TsFile: " +
tsFile.getPath(), e);
+ }
+
+ return modifications;
+ }
+
+ /**
+ * Check if data in the specified time range is completely deleted by mods
Different logic for
+ * tree model and table model
+ *
+ * @param deviceID device ID
+ * @param measurementID measurement ID
+ * @param startTime start time
+ * @param endTime end time
+ * @param modifications modification records
+ * @return true if data is completely deleted, false otherwise
+ */
+ public static boolean isAllDeletedByMods(
+ String deviceID,
+ String measurementID,
+ long startTime,
+ long endTime,
+ PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
modifications) {
+ if (modifications == null) {
+ return false;
+ }
+ List<Modification> mods = null;
+ try {
+ mods = modifications.getOverlapped(CompactionPathUtils.getPath(deviceID,
measurementID));
+ } catch (IllegalPathException ignore) {
+ LOGGER.warn("Failed to get mods for device {} and measurement {}",
deviceID, measurementID);
+ }
+
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ // For tree model: check if any modification covers the time range
+ return mods.stream()
+ .anyMatch(
+ modification -> ((Deletion)
modification).getTimeRange().contains(startTime, endTime));
+ }
+
+ /**
+ * Initialize mods mapping for specified measurement list
+ *
+ * @param deviceID device ID
+ * @param measurements measurement list
+ * @param modifications modification records
+ * @return mapping from measurement ID to mods list and index
+ */
+ public static List<ModsInfo> initializeMeasurementMods(
+ String deviceID,
+ List<String> measurements,
+ PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
modifications) {
+
+ List<ModsInfo> modsInfos = new ArrayList<>(measurements.size());
+
+ for (final String measurement : measurements) {
+ List<Modification> mods;
+ try {
+ mods =
modifications.getOverlapped(CompactionPathUtils.getPath(deviceID, measurement));
+ } catch (IllegalPathException ignore) {
+ mods = null;
+ LOGGER.warn("Failed to get mods for device {} and measurement {}",
deviceID, measurement);
+ }
+
+ if (mods == null || mods.isEmpty()) {
+ // No mods, use empty list and index 0
+ modsInfos.add(new ModsInfo(Collections.emptyList(), 0));
+ continue;
+ }
+ // Store sorted mods and start index
+ modsInfos.add(
+ new
ModsInfo(modificationConvertionToDeletions(ModificationFile.sortAndMerge(mods)),
0));
+ }
+
+ return modsInfos;
+ }
+
+ public static List<Deletion>
modificationConvertionToDeletions(List<Modification> modifications) {
+ List<Deletion> deletions = new ArrayList<>();
+ for (Modification modification : modifications) {
+ if (modification instanceof Deletion) {
+ deletions.add((Deletion) modification);
+ }
+ }
+ return deletions;
+ }
+
+ /**
+ * Check if data at the specified time point is deleted
+ *
+ * @param time time point
+ * @param modsInfo mods information containing mods list and current index
+ * @return true if data is deleted, false otherwise
+ */
+ public static boolean isDelete(long time, ModsInfo modsInfo) {
+ if (modsInfo == null) {
+ return false;
+ }
+
+ final List<Deletion> mods = modsInfo.getMods();
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ int currentIndex = modsInfo.getCurrentIndex();
+ if (currentIndex < 0) {
+ return false;
+ }
+
+ // First, try to use the current index if it's valid
+ if (currentIndex < mods.size()) {
+ final Deletion currentMod = mods.get(currentIndex);
+ final long currentModStartTime = currentMod.getTimeRange().getMin();
+ final long currentModEndTime = currentMod.getTimeRange().getMax();
+
+ if (time < currentModStartTime) {
+ // Time is before current mod, return false
+ return false;
+ } else if (time <= currentModEndTime) {
+ // Time is within current mod range, return true
+ return true;
+ } else {
+ // Time is after current mod, need to search forwards
+ return searchAndCheckMod(mods, time, currentIndex + 1, modsInfo);
+ }
+ } else {
+ // Current index is beyond array bounds, all mods have been processed
+ clearModsAndReset(modsInfo);
+ return false;
+ }
+ }
+
+ /**
+ * Search for a mod using binary search and check if the time point is
deleted
+ *
+ * @param mods sorted list of mods
+ * @param time time point to search for
+ * @param startIndex starting index for search
+ * @param modsInfo mods information to update
+ * @return true if data is deleted, false otherwise
+ */
+ private static boolean searchAndCheckMod(
+ List<Deletion> mods, long time, int startIndex, ModsInfo modsInfo) {
+ int searchIndex = binarySearchMods(mods, time, startIndex);
+ if (searchIndex >= mods.size()) {
+ // All mods checked, clear mods list and reset index to 0
+ clearModsAndReset(modsInfo);
+ return false;
+ }
+
+ final Deletion foundMod = mods.get(searchIndex);
+ final long foundModStartTime = foundMod.getTimeRange().getMin();
+
+ if (time < foundModStartTime) {
+ modsInfo.setCurrentIndex(searchIndex);
+ return false;
+ }
+
+ modsInfo.setCurrentIndex(searchIndex);
+ return true;
+ }
+
+ /**
+ * Clear mods list and reset index to 0
+ *
+ * @param modsInfo mods information to update
+ */
+ private static void clearModsAndReset(ModsInfo modsInfo) {
+ modsInfo.setMods(Collections.emptyList());
+ modsInfo.setCurrentIndex(0);
+ }
+
+ /**
+ * Binary search to find the first mod that might contain the given time
point. Returns the index
+ * of the first mod where modStartTime <= time, or mods.size() if no such
mod exists.
+ *
+ * @param mods sorted list of mods
+ * @param time time point to search for
+ * @param startIndex starting index for search (current index)
+ * @return index of the first potential mod, or mods.size() if none found
+ */
+ private static int binarySearchMods(List<Deletion> mods, long time, int
startIndex) {
+ int left = startIndex;
+ int right = mods.size();
+
+ while (left < right) {
+ int mid = left + (right - left) / 2;
+ final long max = mods.get(mid).getTimeRange().getMax();
+
+ if (max < time) {
+ left = mid + 1;
+ } else {
+ right = mid;
+ }
+ }
+
+ return left;
+ }
+
+ /** Mods information wrapper class, containing mods list and current index */
+ public static class ModsInfo {
+ private List<Deletion> mods;
+ private int currentIndex;
+
+ public ModsInfo(List<Deletion> mods, int currentIndex) {
+ this.mods = mods;
+ this.currentIndex = currentIndex;
+ }
+
+ public List<Deletion> getMods() {
+ return mods;
+ }
+
+ public void setMods(List<Deletion> newMods) {
+ this.mods = newMods;
+ }
+
+ public int getCurrentIndex() {
+ return currentIndex;
+ }
+
+ public void setCurrentIndex(int newIndex) {
+ this.currentIndex = newIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ModsInfo modsInfo = (ModsInfo) o;
+ return Objects.equals(mods, modsInfo.mods)
+ && Objects.equals(currentIndex, modsInfo.currentIndex);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(mods, currentIndex);
+ }
+
+ @Override
+ public String toString() {
+ return "ModsInfo{" + "mods=" + mods + ", currentIndex=" + currentIndex +
'}';
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index caa4a399f67..c7e796c463e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -101,7 +101,7 @@ public class PipeStatementDataTypeConvertExecutionVisitor
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionScanDataContainer container =
new TsFileInsertionScanDataContainer(
- file, new IoTDBPipePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ file, new IoTDBPipePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null, true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
container.toTabletWithIsAligneds()) {
final PipeConvertedInsertTabletStatement statement =
new PipeConvertedInsertTabletStatement(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 1ca4338607c..e6f8ffb73f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -92,7 +92,13 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionScanDataContainer container =
new TsFileInsertionScanDataContainer(
- file, new IoTDBPipePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ file,
+ new IoTDBPipePattern(null),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
container.toTabletWithIsAligneds()) {
final PipeTransferTabletRawReq tabletRawReq =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index f59038982ea..a91a8b97685 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -560,7 +560,7 @@ public class TsFileInsertionDataContainerTest {
isQuery
? new TsFileInsertionQueryDataContainer(tsFile, pattern,
startTime, endTime)
: new TsFileInsertionScanDataContainer(
- tsFile, pattern, startTime, endTime, null, null)) {
+ tsFile, pattern, startTime, endTime, null, null, false)) {
final AtomicInteger count1 = new AtomicInteger(0);
final AtomicInteger count2 = new AtomicInteger(0);
final AtomicInteger count3 = new AtomicInteger(0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
new file mode 100644
index 00000000000..9a9c7ce428a
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ModsOperationUtilTest {
+
+ @Test
+ public void testIsDeleteWithBinarySearch() {
+ // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70,
80]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}, {70, 80}});
+
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
0);
+
+ // Test cases
+ // Time 5: before first mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(5, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 15: within first mod [10, 20], should return true
+ assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 25: between first and second mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(25, modsInfo));
+ assertEquals(1, modsInfo.getCurrentIndex());
+
+ // Time 35: within second mod [30, 40], should return true
+ assertTrue(ModsOperationUtil.isDelete(35, modsInfo));
+ assertEquals(1, modsInfo.getCurrentIndex());
+
+ // Time 45: between second and third mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(45, modsInfo));
+ assertEquals(2, modsInfo.getCurrentIndex());
+
+ // Time 55: within third mod [50, 60], should return true
+ assertTrue(ModsOperationUtil.isDelete(55, modsInfo));
+ assertEquals(2, modsInfo.getCurrentIndex());
+
+ // Time 65: between third and fourth mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(65, modsInfo));
+ assertEquals(3, modsInfo.getCurrentIndex());
+
+ // Time 75: within fourth mod [70, 80], should return true
+ assertTrue(ModsOperationUtil.isDelete(75, modsInfo));
+ assertEquals(3, modsInfo.getCurrentIndex());
+
+ // Time 85: after last mod, should return false and clear mods
+ assertFalse(ModsOperationUtil.isDelete(85, modsInfo));
+ assertTrue(modsInfo.getMods().isEmpty());
+ assertEquals(0, modsInfo.getCurrentIndex());
+ }
+
+ @Test
+ public void testIsDeleteWithEmptyMods() {
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(new
ArrayList<>(), 0);
+
+ // Should return false for any time when mods is empty
+ assertFalse(ModsOperationUtil.isDelete(100, modsInfo));
+ }
+
+ @Test
+ public void testIsDeleteWithNullModsInfo() {
+ // Should return false when modsInfo is null
+ assertFalse(ModsOperationUtil.isDelete(100, null));
+ }
+
+ @Test
+ public void testIsDeleteWithNegativeIndex() {
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}});
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
-1);
+
+ // Should return false when currentIndex is negative
+ assertFalse(ModsOperationUtil.isDelete(15, modsInfo));
+ }
+
+ @Test
+ public void testIsDeleteWithSingleMod() {
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}});
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
0);
+
+ // Time before mod
+ assertFalse(ModsOperationUtil.isDelete(5, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time within mod
+ assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time after mod
+ assertFalse(ModsOperationUtil.isDelete(25, modsInfo));
+ assertTrue(modsInfo.getMods().isEmpty());
+ assertEquals(0, modsInfo.getCurrentIndex());
+ }
+
+ @Test
+ public void testIsDeleteWithOverlappingMods() {
+ // Create overlapping mods: [10, 30], [20, 40], [30, 50]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 30}, {20, 40},
{30, 50}});
+
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
0);
+
+ // Time 15: within first mod
+ assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 25: within both first and second mod, should find first one
+ assertTrue(ModsOperationUtil.isDelete(25, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 35: within second and third mod, should find second one
+ assertTrue(ModsOperationUtil.isDelete(35, modsInfo));
+ assertEquals(1, modsInfo.getCurrentIndex());
+
+ // Time 45: within third mod
+ assertTrue(ModsOperationUtil.isDelete(45, modsInfo));
+ assertEquals(2, modsInfo.getCurrentIndex());
+ }
+
+ @Test
+ public void testBinarySearchMods() {
+ // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70,
80]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}, {70, 80}});
+
+ // Test binary search from start index 0
+ // Time 5: before all mods, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, 5, 0));
+
+ // Time 15: within first mod [10, 20], should return 0
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 25: between first and second mod, should return 1
+ assertEquals(1, binarySearchMods(mods, 25, 0));
+
+ // Time 35: within second mod [30, 40], should return 1
+ assertEquals(1, binarySearchMods(mods, 35, 0));
+
+ // Time 45: between second and third mod, should return 2
+ assertEquals(2, binarySearchMods(mods, 45, 0));
+
+ // Time 55: within third mod [50, 60], should return 2
+ assertEquals(2, binarySearchMods(mods, 55, 0));
+
+ // Time 65: between third and fourth mod, should return 3
+ assertEquals(3, binarySearchMods(mods, 65, 0));
+
+ // Time 75: within fourth mod [70, 80], should return 3
+ assertEquals(3, binarySearchMods(mods, 75, 0));
+
+ // Time 85: after all mods, should return 4 (mods.size())
+ assertEquals(4, binarySearchMods(mods, 85, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithStartIndex() {
+ // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70,
80]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}, {70, 80}});
+
+ // Test binary search starting from index 2
+ // Time 15: before start index, should return 2 (start index)
+ assertEquals(2, binarySearchMods(mods, 15, 2));
+
+ // Time 35: before start index, should return 2 (start index)
+ assertEquals(2, binarySearchMods(mods, 35, 2));
+
+ // Time 55: within mod at index 2, should return 2
+ assertEquals(2, binarySearchMods(mods, 55, 2));
+
+ // Time 65: between mods at index 2 and 3, should return 3
+ assertEquals(3, binarySearchMods(mods, 65, 2));
+
+ // Time 75: within mod at index 3, should return 3
+ assertEquals(3, binarySearchMods(mods, 75, 2));
+
+ // Time 85: after all mods, should return 4 (mods.size())
+ assertEquals(4, binarySearchMods(mods, 85, 2));
+ }
+
+ @Test
+ public void testBinarySearchModsWithOverlappingRanges() {
+ // Create overlapping mods: [10, 30], [20, 40], [30, 50]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 30}, {20, 40},
{30, 50}});
+
+ // Time 15: within first mod, should return 0
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 25: within first and second mod, should return 0 (first match)
+ assertEquals(0, binarySearchMods(mods, 25, 0));
+
+ // Time 35: within second and third mod, should return 1 (first match from
start)
+ assertEquals(1, binarySearchMods(mods, 35, 0));
+
+ // Time 45: within third mod, should return 2
+ assertEquals(2, binarySearchMods(mods, 45, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithEmptyList() {
+ List<Deletion> mods = new ArrayList<>();
+
+ // Should return 0 for any time when mods is empty
+ assertEquals(0, binarySearchMods(mods, 100, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithSingleMod() {
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}});
+
+ // Time 5: before mod, should return 0
+ assertEquals(0, binarySearchMods(mods, 5, 0));
+
+ // Time 15: within mod, should return 0
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 25: after mod, should return 1
+ assertEquals(1, binarySearchMods(mods, 25, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithExactBoundaries() {
+ // Create mods with exact boundaries: [10, 20], [20, 30], [30, 40]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {20, 30},
{30, 40}});
+
+ // Time 20: at boundary, first mod [10, 20] has endTime=20 >= 20, should
return 0
+ assertEquals(0, binarySearchMods(mods, 20, 0));
+
+ // Time 30: at boundary, second mod [20, 30] has endTime=30 >= 30, should
return 1
+ assertEquals(1, binarySearchMods(mods, 30, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithMinBoundaries() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 10: exactly at first mod's min, should return 0
+ assertEquals(0, binarySearchMods(mods, 10, 0));
+
+ // Time 30: exactly at second mod's min, should return 1
+ assertEquals(1, binarySearchMods(mods, 30, 0));
+
+ // Time 50: exactly at third mod's min, should return 2
+ assertEquals(2, binarySearchMods(mods, 50, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithMaxBoundaries() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 20: exactly at first mod's max, should return 0
+ assertEquals(0, binarySearchMods(mods, 20, 0));
+
+ // Time 40: exactly at second mod's max, should return 1
+ assertEquals(1, binarySearchMods(mods, 40, 0));
+
+ // Time 60: exactly at third mod's max, should return 2
+ assertEquals(2, binarySearchMods(mods, 60, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithJustBeforeMin() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 9: just before first mod's min, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, 9, 0));
+
+ // Time 29: just before second mod's min, should return 1 (second mod)
+ assertEquals(1, binarySearchMods(mods, 29, 0));
+
+ // Time 49: just before third mod's min, should return 2 (third mod)
+ assertEquals(2, binarySearchMods(mods, 49, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithJustAfterMax() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 21: just after first mod's max, should return 1 (second mod)
+ assertEquals(1, binarySearchMods(mods, 21, 0));
+
+ // Time 41: just after second mod's max, should return 2 (third mod)
+ assertEquals(2, binarySearchMods(mods, 41, 0));
+
+ // Time 61: just after third mod's max, should return 3 (mods.size())
+ assertEquals(3, binarySearchMods(mods, 61, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithLargeGaps() {
+ // Create mods with large gaps: [10, 20], [100, 200], [1000, 2000]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {100, 200},
{1000, 2000}});
+
+ // Time 50: in large gap, should return 1 (second mod)
+ assertEquals(1, binarySearchMods(mods, 50, 0));
+
+ // Time 500: in large gap, should return 2 (third mod)
+ assertEquals(2, binarySearchMods(mods, 500, 0));
+
+ // Time 5000: after all mods, should return 3 (mods.size())
+ assertEquals(3, binarySearchMods(mods, 5000, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithNegativeTime() {
+ // Create mods: [10, 20], [30, 40]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}});
+
+ // Time -10: negative time, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, -10, 0));
+
+ // Time 0: zero time, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, 0, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithVeryLargeTime() {
+ // Create mods: [10, 20], [30, 40]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40}});
+
+ // Time Long.MAX_VALUE: very large time, should return 2 (mods.size())
+ assertEquals(2, binarySearchMods(mods, Long.MAX_VALUE, 0));
+
+ // Time Long.MIN_VALUE: very small time, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, Long.MIN_VALUE, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithDuplicateTimeRanges() {
+ // Create mods with duplicate time ranges: [10, 20], [10, 20], [30, 40]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {10, 20},
{30, 40}});
+
+ // Time 15: within duplicate ranges, should return 0 (first match)
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 20: at max of duplicate ranges, should return 0 (first match)
+ assertEquals(0, binarySearchMods(mods, 20, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithStartIndexAtEnd() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<Deletion> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Start from index 3 (beyond array), should return 3
+ assertEquals(3, binarySearchMods(mods, 100, 3));
+
+ // Start from index 2, search for time 55, should return 2
+ assertEquals(2, binarySearchMods(mods, 55, 2));
+
+ // Start from index 2, search for time 25, should return 2 (start index)
+ assertEquals(2, binarySearchMods(mods, 25, 2));
+ }
+
+ // Helper method to access the private binarySearchMods method for testing
+ private int binarySearchMods(List<Deletion> mods, long time, int startIndex)
{
+ // Use reflection to access the private method
+ try {
+ java.lang.reflect.Method method =
+ ModsOperationUtil.class.getDeclaredMethod(
+ "binarySearchMods", List.class, long.class, int.class);
+ method.setAccessible(true);
+ return (Integer) method.invoke(null, mods, time, startIndex);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke binarySearchMods method",
e);
+ }
+ }
+
+ private List<Deletion> createTestMods(long[][] timeRanges) {
+ List<Deletion> mods = new ArrayList<>();
+ for (long[] range : timeRanges) {
+ Deletion mod = new Deletion(null, 0, range[0], range[1]);
+ mods.add(mod);
+ }
+ return mods;
+ }
+}