This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ca85e803ecd Pipe: Add TsFile parsing with Mods function (#16540)
ca85e803ecd is described below
commit ca85e803ecd93fb056b383ea85c90ac6537e87f4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Oct 23 17:29:45 2025 +0800
Pipe: Add TsFile parsing with Mods function (#16540)
---
.../pipe/it/dual/tablemodel/TableModelUtils.java | 33 ++
.../IoTDBPipeTsFileDecompositionWithModsIT.java | 192 ++++++
.../IoTDBPipeTsFileDecompositionWithModsIT.java | 660 +++++++++++++++++++++
.../common/tsfile/PipeTsFileInsertionEvent.java | 2 +-
.../tsfile/parser/TsFileInsertionEventParser.java | 16 +
.../parser/TsFileInsertionEventParserProvider.java | 20 +-
.../query/TsFileInsertionEventQueryParser.java | 79 ++-
...ileInsertionEventQueryParserTabletIterator.java | 33 +-
.../scan/TsFileInsertionEventScanParser.java | 356 +++++++----
.../table/TsFileInsertionEventTableParser.java | 30 +-
...ileInsertionEventTableParserTabletIterator.java | 53 +-
.../tsfile/parser/util/ModsOperationUtil.java | 314 ++++++++++
...leStatementDataTypeConvertExecutionVisitor.java | 3 +-
...eeStatementDataTypeConvertExecutionVisitor.java | 2 +-
...leStatementDataTypeConvertExecutionVisitor.java | 3 +-
...eeStatementDataTypeConvertExecutionVisitor.java | 8 +-
.../pipe/event/TsFileInsertionEventParserTest.java | 2 +-
.../tsfile/parser/util/ModsOperationUtilTest.java | 408 +++++++++++++
18 files changed, 2065 insertions(+), 149 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java
index 265548d80cc..e03188d7376 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java
@@ -115,6 +115,39 @@ public class TableModelUtils {
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT,
baseEnv, list, null);
}
+ public static void insertData(
+ final String dataBaseName,
+ final String tableName,
+ final int deviceStartIndex,
+ final int deviceEndIndex,
+ final int startInclusive,
+ final int endExclusive,
+ final BaseEnv baseEnv) {
+ List<String> list = new ArrayList<>(endExclusive - startInclusive + 1);
+ for (int deviceIndex = deviceStartIndex; deviceIndex < deviceEndIndex;
++deviceIndex) {
+ for (int i = startInclusive; i < endExclusive; ++i) {
+ list.add(
+ String.format(
+ "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10,
s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s',
'%s', %s)",
+ tableName,
+ deviceIndex,
+ deviceIndex,
+ deviceIndex,
+ deviceIndex,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ getDateStr(i),
+ i,
+ i));
+ }
+ }
+ TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT,
baseEnv, list, null);
+ }
+
public static void insertData(
final String dataBaseName,
final String tableName,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
new file mode 100644
index 00000000000..22e042e2853
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
+import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
+import
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTableManualBasic.class})
+public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeTableModelDualManualIT {
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in table
+ * model
+ *
+ * <p>Test scenario: 1. Create two storage groups sg1 and sg2, each
containing table1 2. Insert
+ * small amount of data in sg1 (1-6 rows), insert large amount of data in
sg2 (110 batches, 100
+ * rows per batch) 3. Execute FLUSH operation to persist data to TsFile 4.
Execute multiple DELETE
+ * operations on sg1, deleting data in time ranges 2-4 and 3-5 5. Execute
multiple DELETE
+ * operations on sg2, deleting data matching specific conditions (s0-s3
field values) 6. Execute
+ * FLUSH operation again 7. Create pipe with mods enabled, synchronize data
to receiver 8. Verify
+ * correctness of receiver data: - sg1 only retains time=1 data, time=2-4
data is correctly
+ * deleted - sg2 DELETE operation results meet expectations (t10 retains
1000 rows, t11 all
+ * deleted, t12 retains 5900 rows, etc.)
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods
(modification operations) in
+ * TsFile, ensuring DELETE operations can be correctly synchronized to the
receiver and data
+ * consistency is guaranteed.
+ */
+ @Test
+ public void testTsFileDecompositionWithMods() {
+ TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg1");
+ TableModelUtils.createDataBaseAndTable(receiverEnv, "table1", "sg1");
+
+ TableModelUtils.insertData("sg1", "table1", 1, 6, senderEnv);
+
+ TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg2");
+ for (int i = 1; i <= 110; i++) {
+ TableModelUtils.insertData("sg2", "table1", 10, 15, (i - 1) * 100, i *
100, senderEnv);
+ }
+
+ executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 2 AND time <= 4",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg1",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 3 AND time <= 5",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg1",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t10' AND
s1='t10' AND s2='t10' AND s3='t10'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 0 AND time <= 11000 AND s0 ='t11'
AND s1='t11' AND s2='t11' AND s3='t11'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 5000 AND time < 10100 AND s0 ='t12'
AND s1='t12' AND s2='t12' AND s3='t12'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t13' AND
s1='t13' AND s2='t13' AND s3='t13'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 10000 AND time <= 11000 AND s0
='t14' AND s1='t14' AND s2='t14' AND s3='t14'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true',
'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s',
'username'='root', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ HashSet<String> expectedResults = new HashSet<>();
+ expectedResults.add(
+
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ TableModelUtils.getQuerySql("table1"),
+ TableModelUtils.generateHeaderResults(),
+ expectedResults,
+ "sg1");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4",
+ "s4,",
+ Collections.emptySet(),
+ "sg1");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t10' AND s1='t10' AND
s2='t10' AND s3='t10'",
+ "count,",
+ Collections.singleton("1000,"),
+ "sg2");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t11' AND s1='t11' AND
s2='t11' AND s3='t11'",
+ "count,",
+ Collections.singleton("0,"),
+ "sg2");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t12' AND s1='t12' AND
s2='t12' AND s3='t12'",
+ "count,",
+ Collections.singleton("5900,"),
+ "sg2");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t13' AND s1='t13' AND
s2='t13' AND s3='t13'",
+ "count,",
+ Collections.singleton("1000,"),
+ "sg2");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t14' AND s1='t14' AND
s2='t14' AND s3='t14'",
+ "count,",
+ Collections.singleton("10000,"),
+ "sg2");
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
new file mode 100644
index 00000000000..b939ec79b8b
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.manual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeManual.class})
+public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTreeModelManualIT {
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+ senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+ receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+ }
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned
timeseries), d2
+ * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned
timeseries) 2. Insert initial
+ * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile
4. Execute DELETE
+ * operation on d1.s1, deleting data in time range 2-4 5. Insert large
amount of data into d4
+ * (11000 records, inserted in batches) 6. Execute multiple DELETE
operations on d4: - Delete s1
+ * field data where time<=10000 - Delete s2 field data where time>1000 -
Delete s3 field data
+ * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH
operation again 9. Create
+ * pipe with mods enabled, synchronize data to receiver 10. Verify
correctness of receiver data: -
+ * d1 s1 field is null in time range 2-4, other data is normal - d2 and d3
data is completely
+ * deleted - d4 DELETE operation results meet expectations for each field
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods
(modification operations) in
+ * TsFile under tree model, ensuring various DELETE operations can be
correctly synchronized to
+ * the receiver and data consistency is guaranteed.
+ */
+ @Test
+ public void testTsFileDecompositionWithMods() throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0,
2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4,
2.4, 3.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2,
10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES ";
+ StringBuilder insertBuilder = new StringBuilder(s);
+ for (int i = 1; i <= 11000; i++) {
+ insertBuilder
+ .append("(")
+ .append(i)
+ .append(",")
+ .append(1.0f)
+ .append(",")
+ .append(2.0f)
+ .append(",")
+ .append(3.0f)
+ .append(")");
+ if (i % 100 != 0) {
+ insertBuilder.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder.toString());
+ insertBuilder = new StringBuilder(s);
+ }
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1
WHERE time <= 10000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2
WHERE time > 1000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3
WHERE time <= 8000");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipe to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ HashSet<String> results = new HashSet<>();
+ results.add("1,3.0,1.0,2.0,");
+ results.add("2,3.1,null,2.1,");
+ results.add("3,3.2,null,2.2,");
+ results.add("4,3.3,null,2.3,");
+ results.add("5,3.4,1.4,2.4,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4",
+ "Time,root.sg1.d1.s1,",
+ Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d4",
+ "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),",
+ Collections.singleton("3000,1000,1000,"));
+ }
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ * - Multi-pipe scenario
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 4 devices: d1 (aligned
timeseries), d2
+ * (non-aligned timeseries), d3 (aligned timeseries), d4 (aligned
timeseries) 2. Insert initial
+ * data into d1, d2, d3 3. Execute FLUSH operation to persist data to TsFile
4. Execute DELETE
+ * operation on d1.s1, deleting data in time range 2-4 5. Insert large
amount of data into d4
+ * (11000 records, inserted in batches) 6. Execute multiple DELETE
operations on d4: - Delete s1
+ * field data where time<=10000 - Delete s2 field data where time>1000 -
Delete s3 field data
+ * where time<=8000 7. Delete all data from d2 and d3 8. Execute FLUSH
operation again 9. Create 4
+ * independent pipes, each targeting different device paths: - test_pipe1:
handles data for
+ * root.sg1.d1.** path - test_pipe2: handles data for root.sg1.d2.** path -
test_pipe3: handles
+ * data for root.sg1.d3.** path - test_pipe4: handles data for
root.sg1.d4.** path 10. Verify
+ * correctness of receiver data: - d1 s1 field is null in time range 2-4,
other data is normal -
+ * d2 and d3 data is completely deleted - d4 DELETE operation results meet
expectations for each
+ * field
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods
(modification operations) in
+ * TsFile under tree model through multiple independent pipes, ensuring
DELETE operations for
+ * different paths can be correctly synchronized to the receiver and data
consistency is
+ * guaranteed. The main difference from the first test method is using
multiple pipes to handle
+ * data for different devices separately.
+ */
+ @Test
+ public void testTsFileDecompositionWithMods2() throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d1(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s1 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE TIMESERIES root.sg1.d2.s2 WITH DATATYPE=FLOAT");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d3(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d1(time, s1, s2, s3) ALIGNED VALUES (1, 1.0,
2.0, 3.0), (2, 1.1, 2.1, 3.1), (3, 1.2, 2.2, 3.2), (4, 1.3, 2.3, 3.3), (5, 1.4,
2.4, 3.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d2(time, s1, s2) VALUES (1, 10.0, 20.0), (2,
10.1, 20.1), (3, 10.2, 20.2), (4, 10.3, 20.3), (5, 10.4, 20.4)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time >= 2 AND time <= 4");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "INSERT INTO root.sg1.d3(time, s1, s2, s3) ALIGNED VALUES (1, 100.0,
200.0, 300.0), (2, 100.1, 200.1, 300.1), (3, 100.2, 200.2, 300.2)");
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "CREATE ALIGNED TIMESERIES root.sg1.d4(s1 FLOAT, s2 FLOAT,
s3 FLOAT)");
+ String s = "INSERT INTO root.sg1.d4(time, s1, s2, s3) ALIGNED VALUES ";
+ StringBuilder insertBuilder = new StringBuilder(s);
+ for (int i = 1; i <= 11000; i++) {
+ insertBuilder
+ .append("(")
+ .append(i)
+ .append(",")
+ .append(1.0f)
+ .append(",")
+ .append(2.0f)
+ .append(",")
+ .append(3.0f)
+ .append(")");
+ if (i % 100 != 0) {
+ insertBuilder.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder.toString());
+ insertBuilder = new StringBuilder(s);
+ }
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s1
WHERE time <= 10000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s2
WHERE time > 1000");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d4.s3
WHERE time <= 8000");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d2.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "DELETE FROM root.sg1.d3.*");
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipes to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ HashSet<String> results = new HashSet<>();
+ results.add("1,3.0,1.0,2.0,");
+ results.add("2,3.1,null,2.1,");
+ results.add("3,3.2,null,2.2,");
+ results.add("4,3.3,null,2.3,");
+ results.add("5,3.4,1.4,2.4,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe1 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d1.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe2 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d2.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe3 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d3.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe4 WITH SOURCE
('mods.enable'='true','path'='root.sg1.d4.**') WITH CONNECTOR('ip'='%s',
'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT * FROM root.sg1.d1 ORDER BY time",
+ "Time,root.sg1.d1.s3,root.sg1.d1.s1,root.sg1.d1.s2,",
+ results);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d2 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "SELECT * FROM root.sg1.d3 ORDER BY time", "Time,",
Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT s1 FROM root.sg1.d1 WHERE time >= 2 AND time <= 4",
+ "Time,root.sg1.d1.s1,",
+ Collections.emptySet());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d4",
+ "COUNT(root.sg1.d4.s3),COUNT(root.sg1.d4.s1),COUNT(root.sg1.d4.s2),",
+ Collections.singleton("3000,1000,1000,"));
+ }
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ * - Large scale single point deletion scenario
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 1 device: d1 (aligned
timeseries with 5
+ * sensors) 2. Insert 20000 data points for each sensor with different time
ranges: - s1: time
+ * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time
30001-50000 - s5: time
+ * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4.
Execute 2000 single point
+ * DELETE operations, each deleting one time point from different sensors 5.
Execute FLUSH
+ * operation again 6. Create pipe with mods enabled 7. Verify correctness of
receiver data: - Each
+ * sensor should have 19800 remaining data points - Deleted points should
not appear in receiver
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle large scale
single point deletion
+ * operations in TsFile under tree model, ensuring the binary search
optimization in
+ * ModsOperationUtil works correctly with many modification entries.
+ */
+ @Test
+ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion()
throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ // Insert 20000 data points for s1 (time 1-20000)
+ String s1 = "INSERT INTO root.sg1.d1(time, s1) ALIGNED VALUES ";
+ StringBuilder insertBuilder1 = new StringBuilder(s1);
+ for (int i = 1; i <= 20000; i++) {
+
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder1.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder1.toString());
+ insertBuilder1 = new StringBuilder(s1);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder1.length() > s1.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
+ }
+
+ // Insert 20000 data points for s2 (time 10001-30000)
+ String s2 = "INSERT INTO root.sg1.d1(time, s2) ALIGNED VALUES ";
+ StringBuilder insertBuilder2 = new StringBuilder(s2);
+ for (int i = 10001; i <= 30000; i++) {
+
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder2.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder2.toString());
+ insertBuilder2 = new StringBuilder(s2);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder2.length() > s2.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
+ }
+
+ // Insert 20000 data points for s3 (time 20001-40000)
+ String s3 = "INSERT INTO root.sg1.d1(time, s3) ALIGNED VALUES ";
+ StringBuilder insertBuilder3 = new StringBuilder(s3);
+ for (int i = 20001; i <= 40000; i++) {
+
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder3.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder3.toString());
+ insertBuilder3 = new StringBuilder(s3);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder3.length() > s3.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
+ }
+
+ // Insert 20000 data points for s4 (time 30001-50000)
+ String s4 = "INSERT INTO root.sg1.d1(time, s4) ALIGNED VALUES ";
+ StringBuilder insertBuilder4 = new StringBuilder(s4);
+ for (int i = 30001; i <= 50000; i++) {
+
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder4.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder4.toString());
+ insertBuilder4 = new StringBuilder(s4);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder4.length() > s4.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
+ }
+
+ // Insert 20000 data points for s5 (time 40001-60000)
+ String s5 = "INSERT INTO root.sg1.d1(time, s5) ALIGNED VALUES ";
+ StringBuilder insertBuilder5 = new StringBuilder(s5);
+ for (int i = 40001; i <= 60000; i++) {
+
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
+ if (i % 1000 != 0) {
+ insertBuilder5.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder5.toString());
+ insertBuilder5 = new StringBuilder(s5);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder5.length() > s5.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Execute 2000 single point DELETE operations
+ // Delete 400 points from each sensor (distributed across different time
ranges)
+ for (int i = 0; i < 400; i++) {
+ // Delete from s1: every 10th point starting from 10
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i *
10));
+
+ // Delete from s2: every 10th point starting from 10010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i *
10));
+
+ // Delete from s3: every 10th point starting from 20010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i *
10));
+
+ // Delete from s4: every 10th point starting from 30010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i *
10));
+
+ // Delete from s5: every 10th point starting from 40010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i *
10));
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipe to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT COUNT(**) FROM root.sg1.d1",
+
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,19600,19600,19600,19600,"));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ // Verify total count of all sensors using COUNT(*)
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d1",
+
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,19600,19600,19600,19600,"));
+
+ // Verify individual sensor counts
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("19600,"));
+
+ // Verify count of deleted time ranges using COUNT with WHERE clause
+ // These should return 0 since all points in these ranges were deleted
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000
AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <=
14000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <=
24000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <=
34000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <=
44000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("0,"));
+
+ // Verify count of non-deleted time ranges using multiple range queries
+ // Check ranges before deletion area
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time <
10010",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time <
20010",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time <
30010",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time <
40010",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("9,"));
+
+ // Check ranges after deletion area
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <=
20000",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <=
30000",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <=
40000",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <=
50000",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <=
60000",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("16000,"));
+
+ // Check non-deleted points within deletion range (every 10th point except
the ones we deleted)
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000
AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <=
14000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <=
24000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <=
34000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <=
44000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("3591,"));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 3617b7347ad..2a1ab3f5a35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -700,7 +700,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
// To avoid renaming of the tsFile database
shouldParse4Privilege ? userName : null,
this)
- .provide());
+ .provide(isWithMod));
return eventParser.get();
} catch (final IOException e) {
close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index 358103175fe..b68d9492426 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
@@ -27,6 +28,8 @@ import
org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -53,6 +56,10 @@ public abstract class TsFileInsertionEventParser implements
AutoCloseable {
protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final PipeInsertionEvent sourceEvent; // used to report progress
+ // mods entry
+ protected PipeMemoryBlock allocatedMemoryBlockForModifications;
+ protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
currentModifications;
+
protected final long initialTimeNano = System.nanoTime();
protected boolean timeUsageReported = false;
@@ -124,5 +131,14 @@ public abstract class TsFileInsertionEventParser
implements AutoCloseable {
if (allocatedMemoryBlockForTablet != null) {
allocatedMemoryBlockForTablet.close();
}
+
+ if (currentModifications != null) {
+ // help GC
+ currentModifications = null;
+ }
+
+ if (allocatedMemoryBlockForModifications != null) {
+ allocatedMemoryBlockForModifications.close();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index d6a276b9f25..8afd1627790 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -78,7 +78,7 @@ public class TsFileInsertionEventParserProvider {
this.sourceEvent = sourceEvent;
}
- public TsFileInsertionEventParser provide() throws IOException {
+ public TsFileInsertionEventParser provide(final boolean isWithMod) throws
IOException {
if (pipeName != null) {
PipeTsFileToTabletsMetrics.getInstance()
.markTsFileToTabletInvocation(pipeName + "_" + creationTime);
@@ -94,7 +94,8 @@ public class TsFileInsertionEventParserProvider {
endTime,
pipeTaskMeta,
userName,
- sourceEvent);
+ sourceEvent,
+ isWithMod);
}
// Use scan container to save memory
@@ -109,7 +110,8 @@ public class TsFileInsertionEventParserProvider {
startTime,
endTime,
pipeTaskMeta,
- sourceEvent);
+ sourceEvent,
+ isWithMod);
}
if (treePattern instanceof UnionIoTDBTreePattern
@@ -128,7 +130,8 @@ public class TsFileInsertionEventParserProvider {
startTime,
endTime,
pipeTaskMeta,
- sourceEvent);
+ sourceEvent,
+ isWithMod);
}
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -144,7 +147,8 @@ public class TsFileInsertionEventParserProvider {
startTime,
endTime,
pipeTaskMeta,
- sourceEvent);
+ sourceEvent,
+ isWithMod);
}
final int originalSize = deviceIsAlignedMap.size();
@@ -161,7 +165,8 @@ public class TsFileInsertionEventParserProvider {
startTime,
endTime,
pipeTaskMeta,
- sourceEvent)
+ sourceEvent,
+ isWithMod)
: new TsFileInsertionEventQueryParser(
pipeName,
creationTime,
@@ -171,7 +176,8 @@ public class TsFileInsertionEventParserProvider {
endTime,
pipeTaskMeta,
sourceEvent,
- filteredDeviceIsAlignedMap);
+ filteredDeviceIsAlignedMap,
+ isWithMod);
}
private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index d61f7a791ca..dbed85d5a6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -26,15 +26,18 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -75,7 +78,7 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
final long endTime,
final PipeInsertionEvent sourceEvent)
throws IOException {
- this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent);
+ this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent,
false);
}
public TsFileInsertionEventQueryParser(
@@ -86,7 +89,8 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
- final PipeInsertionEvent sourceEvent)
+ final PipeInsertionEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
this(
pipeName,
@@ -97,7 +101,8 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
endTime,
pipeTaskMeta,
sourceEvent,
- null);
+ null,
+ isWithMod);
}
public TsFileInsertionEventQueryParser(
@@ -109,11 +114,20 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final PipeInsertionEvent sourceEvent,
- final Map<IDeviceID, Boolean> deviceIsAlignedMap)
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap,
+ final boolean isWithMod)
throws IOException {
super(pipeName, creationTime, pattern, null, startTime, endTime,
pipeTaskMeta, sourceEvent);
try {
+ currentModifications =
+ isWithMod
+ ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+ : PatternTreeMapFactory.getModsPatternTreeMap();
+ allocatedMemoryBlockForModifications =
+ PipeDataNodeResourceManager.memory()
+
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
+
final PipeTsFileResourceManager tsFileResourceManager =
PipeDataNodeResourceManager.tsfile();
final Map<IDeviceID, List<String>> deviceMeasurementsMap;
@@ -158,6 +172,60 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+ final Iterator<Map.Entry<IDeviceID, List<String>>> iterator =
+ deviceMeasurementsMap.entrySet().iterator();
+ while (isWithMod && iterator.hasNext()) {
+ final Map.Entry<IDeviceID, List<String>> entry = iterator.next();
+ final IDeviceID deviceId = entry.getKey();
+ final List<String> measurements = entry.getValue();
+
+ // Check if deviceId is deleted
+ if (deviceId == null) {
+ LOGGER.warn("Found null deviceId, removing entry");
+ iterator.remove();
+ continue;
+ }
+
+ // Check if measurements list is deleted or empty
+ if (measurements == null || measurements.isEmpty()) {
+ iterator.remove();
+ continue;
+ }
+
+ if (!currentModifications.isEmpty()) {
+ // Safely filter measurements, remove non-existent measurements
+ measurements.removeIf(
+ measurement -> {
+ if (measurement == null) {
+ return true;
+ }
+
+ try {
+ TimeseriesMetadata meta =
+ tsFileSequenceReader.readTimeseriesMetadata(deviceId,
measurement, true);
+ return ModsOperationUtil.isAllDeletedByMods(
+ deviceId,
+ measurement,
+ meta.getStatistics().getStartTime(),
+ meta.getStatistics().getEndTime(),
+ currentModifications);
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Failed to read metadata for deviceId: {}, measurement:
{}, removing",
+ deviceId,
+ measurement,
+ e);
+ return true;
+ }
+ });
+ }
+
+ // If measurements list is empty after filtering, remove the entire
entry
+ if (measurements.isEmpty()) {
+ iterator.remove();
+ }
+ }
+
// Filter again to get the final deviceMeasurementsMap that exactly
matches the pattern.
deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
@@ -303,7 +371,8 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
entry.getKey(),
entry.getValue(),
timeFilterExpression,
- allocatedMemoryBlockForTablet);
+ allocatedMemoryBlockForTablet,
+ currentModifications);
} catch (final Exception e) {
close();
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
index 7c32321186e..776b5e1e6fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.query;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.common.constant.TsFileConstant;
@@ -62,6 +66,9 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
private final PipeMemoryBlock allocatedBlockForTablet;
+ // Maintain sorted mods list and current index for each measurement
+ private final List<ModsOperationUtil.ModsInfo> measurementModsList;
+
private RowRecord rowRecord;
TsFileInsertionEventQueryParserTabletIterator(
@@ -70,7 +77,8 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
final IDeviceID deviceId,
final List<String> measurements,
final IExpression timeFilterExpression,
- final PipeMemoryBlock allocatedBlockForTablet)
+ final PipeMemoryBlock allocatedBlockForTablet,
+ final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
currentModifications)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
@@ -90,6 +98,10 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
this.queryDataSet = buildQueryDataSet();
this.allocatedBlockForTablet =
Objects.requireNonNull(allocatedBlockForTablet);
+
+ this.measurementModsList =
+ ModsOperationUtil.initializeMeasurementMods(
+ deviceId, this.measurements, currentModifications);
}
private QueryDataSet buildQueryDataSet() throws IOException {
@@ -163,16 +175,23 @@ public class
TsFileInsertionEventQueryParserTabletIterator implements Iterator<T
final int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
-
+ boolean isNeedFillTime = false;
final List<Field> fields = rowRecord.getFields();
final int fieldSize = fields.size();
for (int i = 0; i < fieldSize; i++) {
final Field field = fields.get(i);
- tablet.addValue(
- measurements.get(i),
- rowIndex,
- field == null ? null :
field.getObjectValue(schemas.get(i).getType()));
+ final String measurement = measurements.get(i);
+ // Check if this value is deleted by mods
+ if (field == null
+ || ModsOperationUtil.isDelete(rowRecord.getTimestamp(),
measurementModsList.get(i))) {
+ tablet.getBitMaps()[i].mark(rowIndex);
+ } else {
+ tablet.addValue(measurement, rowIndex,
field.getObjectValue(schemas.get(i).getType()));
+ isNeedFillTime = true;
+ }
+ }
+ if (isNeedFillTime) {
+ tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
}
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 44941e34c3a..043cc87fa10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -25,9 +25,11 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -36,7 +38,10 @@ import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Chunk;
@@ -44,6 +49,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.reader.IChunkReader;
import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -55,6 +61,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -77,7 +84,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
private IDeviceID currentDevice;
private boolean currentIsAligned;
private final List<IMeasurementSchema> currentMeasurements = new
ArrayList<>();
-
+ private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
private final List<Boolean> isMultiPageList = new ArrayList<>();
@@ -96,7 +103,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
- final PipeInsertionEvent sourceEvent)
+ final PipeInsertionEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
super(pipeName, creationTime, pattern, null, startTime, endTime,
pipeTaskMeta, sourceEvent);
@@ -113,7 +121,19 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
try {
- tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
+ currentModifications =
+ isWithMod
+ ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+ : PatternTreeMapFactory.getModsPatternTreeMap();
+ allocatedMemoryBlockForModifications =
+ PipeDataNodeResourceManager.memory()
+
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
+
+ tsFileSequenceReader =
+ new TsFileSequenceReader(
+ tsFile.getAbsolutePath(),
+ !currentModifications.isEmpty(),
+ !currentModifications.isEmpty());
tsFileSequenceReader.position((long)
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
prepareData();
@@ -129,9 +149,10 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
- final PipeInsertionEvent sourceEvent)
+ final PipeInsertionEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
- this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
+ this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent, isWithMod);
}
@Override
@@ -264,8 +285,9 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
final int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, data.currentTime());
- putValueToColumns(data, tablet, rowIndex);
+ if (putValueToColumns(data, tablet, rowIndex)) {
+ tablet.addTimestamp(rowIndex, data.currentTime());
+ }
}
data.next();
@@ -315,13 +337,24 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
} while (!data.hasCurrent());
}
- private void putValueToColumns(final BatchData data, final Tablet tablet,
final int rowIndex) {
+ private boolean putValueToColumns(final BatchData data, final Tablet tablet,
final int rowIndex) {
+ boolean isNeedFillTime = false;
if (data.getDataType() == TSDataType.VECTOR) {
for (int i = 0; i < tablet.getSchemas().size(); ++i) {
final TsPrimitiveType primitiveType = data.getVector()[i];
- if (Objects.isNull(primitiveType)) {
+ if (Objects.isNull(primitiveType)
+ || ModsOperationUtil.isDelete(data.currentTime(),
modsInfos.get(i))) {
+ switch (tablet.getSchemas().get(i).getType()) {
+ case TEXT:
+ case BLOB:
+ case STRING:
+ tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
+ }
+ tablet.getBitMaps()[i].mark(rowIndex);
continue;
}
+
+ isNeedFillTime = true;
switch (tablet.getSchemas().get(i).getType()) {
case BOOLEAN:
tablet.addValue(rowIndex, i, primitiveType.getBoolean());
@@ -352,6 +385,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
}
} else {
+ isNeedFillTime = true;
switch (tablet.getSchemas().get(0).getType()) {
case BOOLEAN:
tablet.addValue(rowIndex, 0, data.getBoolean());
@@ -381,6 +415,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
throw new UnSupportedDataTypeException("UnSupported" +
data.getDataType());
}
}
+ return isNeedFillTime;
}
private void moveToNextChunkReader() throws IOException,
IllegalStateException {
@@ -388,6 +423,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
long valueChunkSize = 0;
final List<Chunk> valueChunkList = new ArrayList<>();
currentMeasurements.clear();
+ modsInfos.clear();
if (lastMarker == MetaMarker.SEPARATOR) {
chunkReader = null;
@@ -403,131 +439,191 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
- // Notice that the data in one chunk group is either aligned or
non-aligned
- // There is no need to consider non-aligned chunks when there are
value chunks
- currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
+ {
+ // Notice that the data in one chunk group is either aligned or
non-aligned
+ // There is no need to consider non-aligned chunks when there are
value chunks
+ currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
+ long currentChunkHeaderOffset = tsFileSequenceReader.position() -
1;
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
- chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+ final long nextMarkerOffset =
+ tsFileSequenceReader.position() + chunkHeader.getDataSize();
- if (Objects.isNull(currentDevice)) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ if (Objects.isNull(currentDevice)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
- if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
- == TsFileConstant.TIME_COLUMN_MASK) {
- timeChunkList.add(
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
- break;
- }
+ if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK) {
+ timeChunkList.add(
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
+ isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
+ break;
+ }
- if (!treePattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ if (!treePattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
- if (chunkHeader.getDataSize() >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunk,
chunkHeader.getDataSize());
- }
+ // Skip the chunk if it is fully deleted by mods
+ if (!currentModifications.isEmpty()) {
+ final Statistics statistics =
+ findNonAlignedChunkStatistics(
+ tsFileSequenceReader.getIChunkMetadataList(
+ currentDevice, chunkHeader.getMeasurementID()),
+ currentChunkHeaderOffset);
+ if (statistics != null
+ && ModsOperationUtil.isAllDeletedByMods(
+ currentDevice,
+ chunkHeader.getMeasurementID(),
+ statistics.getStartTime(),
+ statistics.getEndTime(),
+ currentModifications)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
+ }
- chunkReader =
- currentIsMultiPage
- ? new ChunkReader(
- new Chunk(
- chunkHeader,
- tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())),
- filter)
- : new SinglePageWholeChunkReader(
- new Chunk(
- chunkHeader,
- tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- currentIsAligned = false;
- currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
- return;
+ if (chunkHeader.getDataSize() >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunk,
chunkHeader.getDataSize());
+ }
+
+ Chunk chunk =
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+
+ chunkReader =
+ currentIsMultiPage
+ ? new ChunkReader(chunk, filter)
+ : new SinglePageWholeChunkReader(chunk);
+ currentIsAligned = false;
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ modsInfos.addAll(
+ ModsOperationUtil.initializeMeasurementMods(
+ currentDevice,
+ Collections.singletonList(chunkHeader.getMeasurementID()),
+ currentModifications));
+ return;
+ }
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
- if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
- chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
-
- if (Objects.isNull(currentDevice)
- || !treePattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ {
+ if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
+ long currentChunkHeaderOffset = tsFileSequenceReader.position()
- 1;
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+ final long nextMarkerOffset =
+ tsFileSequenceReader.position() + chunkHeader.getDataSize();
+ if (Objects.isNull(currentDevice)
+ || !treePattern.matchesMeasurement(
+ currentDevice, chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
- // Increase value index
- final int valueIndex =
- measurementIndexMap.compute(
- chunkHeader.getMeasurementID(),
- (measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
+ if (!currentModifications.isEmpty()) {
+ // Skip the chunk if it is fully deleted by mods
+ final Statistics statistics =
+ findAlignedChunkStatistics(
+ tsFileSequenceReader.getIChunkMetadataList(
+ currentDevice, chunkHeader.getMeasurementID()),
+ currentChunkHeaderOffset);
+ if (statistics != null
+ && ModsOperationUtil.isAllDeletedByMods(
+ currentDevice,
+ chunkHeader.getMeasurementID(),
+ statistics.getStartTime(),
+ statistics.getEndTime(),
+ currentModifications)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
+ }
+ }
- // Emit when encountered non-sequential value chunk, or the chunk
size exceeds
- // certain value to avoid OOM
- // Do not record or end current value chunks when there are empty
chunks
- if (chunkHeader.getDataSize() == 0) {
- break;
- }
- boolean needReturn = false;
- final long timeChunkSize =
- lastIndex >= 0
- ?
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
- : 0;
- if (lastIndex >= 0) {
- if (valueIndex != lastIndex) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
- } else {
- final long chunkSize = timeChunkSize + valueChunkSize;
- if (chunkSize + chunkHeader.getDataSize()
- > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- if (valueChunkList.size() == 1
- && chunkSize >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunk, chunkSize);
- }
+ // Increase value index
+ final int valueIndex =
+ measurementIndexMap.compute(
+ chunkHeader.getMeasurementID(),
+ (measurement, index) -> Objects.nonNull(index) ? index +
1 : 0);
+
+ // Emit when encountered non-sequential value chunk, or the
chunk size exceeds
+ // certain value to avoid OOM
+ // Do not record or end current value chunks when there are
empty chunks
+ if (chunkHeader.getDataSize() == 0) {
+ break;
+ }
+ boolean needReturn = false;
+ final long timeChunkSize =
+ lastIndex >= 0
+ ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+ timeChunkList.get(lastIndex))
+ : 0;
+ if (lastIndex >= 0) {
+ if (valueIndex != lastIndex) {
needReturn = recordAlignedChunk(valueChunkList, marker);
+ } else {
+ final long chunkSize = timeChunkSize + valueChunkSize;
+ if (chunkSize + chunkHeader.getDataSize()
+ > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ if (valueChunkList.size() == 1
+ && chunkSize >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunk,
chunkSize);
+ }
+ needReturn = recordAlignedChunk(valueChunkList, marker);
+ }
}
}
+ lastIndex = valueIndex;
+ if (needReturn) {
+ firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+ return;
+ }
+ } else {
+ chunkHeader = firstChunkHeader4NextSequentialValueChunks;
+ firstChunkHeader4NextSequentialValueChunks = null;
}
- lastIndex = valueIndex;
- if (needReturn) {
- firstChunkHeader4NextSequentialValueChunks = chunkHeader;
- return;
- }
- } else {
- chunkHeader = firstChunkHeader4NextSequentialValueChunks;
- firstChunkHeader4NextSequentialValueChunks = null;
- }
- valueChunkSize += chunkHeader.getDataSize();
- valueChunkList.add(
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
- break;
+ Chunk chunk =
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+
+ valueChunkSize += chunkHeader.getDataSize();
+ valueChunkList.add(chunk);
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ modsInfos.addAll(
+ ModsOperationUtil.initializeMeasurementMods(
+ currentDevice,
+ Collections.singletonList(chunkHeader.getMeasurementID()),
+ currentModifications));
+ break;
+ }
case MetaMarker.CHUNK_GROUP_HEADER:
- // Return before "currentDevice" changes
- if (recordAlignedChunk(valueChunkList, marker)) {
- return;
+ {
+ // Return before "currentDevice" changes
+ if (recordAlignedChunk(valueChunkList, marker)) {
+ return;
+ }
+ // Clear because the cached data will never be used in the next
chunk group
+ lastIndex = -1;
+ timeChunkList.clear();
+ isMultiPageList.clear();
+ measurementIndexMap.clear();
+ final IDeviceID deviceID =
tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
+ currentDevice = treePattern.mayOverlapWithDevice(deviceID) ?
deviceID : null;
+ break;
}
- // Clear because the cached data will never be used in the next
chunk group
- lastIndex = -1;
- timeChunkList.clear();
- isMultiPageList.clear();
- measurementIndexMap.clear();
- final IDeviceID deviceID =
tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
- currentDevice = treePattern.mayOverlapWithDevice(deviceID) ?
deviceID : null;
- break;
case MetaMarker.OPERATION_INDEX_RANGE:
- tsFileSequenceReader.readPlanIndex();
- break;
+ {
+ tsFileSequenceReader.readPlanIndex();
+ break;
+ }
default:
MetaMarker.handleUnexpectedMarker(marker);
}
@@ -568,4 +664,32 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
allocatedMemoryBlockForChunk.close();
}
}
+
+ private Statistics findAlignedChunkStatistics(
+ List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) {
+ for (IChunkMetadata metadata : metadataList) {
+ if (!(metadata instanceof AlignedChunkMetadata)) {
+ continue;
+ }
+ List<IChunkMetadata> list = ((AlignedChunkMetadata)
metadata).getValueChunkMetadataList();
+ for (IChunkMetadata m : list) {
+ if (m.getOffsetOfChunkHeader() == currentChunkHeaderOffset) {
+ return m.getStatistics();
+ }
+ }
+ break;
+ }
+ return null;
+ }
+
+ private Statistics findNonAlignedChunkStatistics(
+ List<IChunkMetadata> metadataList, long currentChunkHeaderOffset) {
+ for (IChunkMetadata metadata : metadataList) {
+ if (metadata.getOffsetOfChunkHeader() == currentChunkHeaderOffset) {
+ // found the corresponding chunk metadata
+ return metadata.getStatistics();
+ }
+ }
+ return null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 76cc32ef347..8e874acb683 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -28,9 +28,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -49,6 +51,7 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
private final long endTime;
private final TablePattern tablePattern;
private final String userName;
+ private final boolean isWithMod;
private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
private final PipeMemoryBlock allocatedMemoryBlockForChunk;
@@ -64,11 +67,20 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final String userName,
- final PipeInsertionEvent sourceEvent)
+ final PipeInsertionEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
super(pipeName, creationTime, null, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
+ this.isWithMod = isWithMod;
try {
+ currentModifications =
+ isWithMod
+ ? ModsOperationUtil.loadModificationsFromTsFile(tsFile)
+ : PatternTreeMapFactory.getModsPatternTreeMap();
+ allocatedMemoryBlockForModifications =
+ PipeDataNodeResourceManager.memory()
+
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
long tableSize =
Math.min(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(),
@@ -106,9 +118,20 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final String userName,
- final PipeInsertionEvent sourceEvent)
+ final PipeInsertionEvent sourceEvent,
+ final boolean isWithMod)
throws IOException {
- this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, userName,
sourceEvent);
+ this(
+ null,
+ 0,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ userName,
+ sourceEvent,
+ isWithMod);
}
@Override
@@ -136,6 +159,7 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
allocatedMemoryBlockForChunk,
allocatedMemoryBlockForChunkMeta,
allocatedMemoryBlockForTableSchemas,
+ currentModifications,
startTime,
endTime);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 746d9d5b4a0..f05cf872c79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.ColumnCategory;
@@ -75,6 +79,9 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
private final PipeMemoryBlock allocatedMemoryBlockForChunkMeta;
private final PipeMemoryBlock allocatedMemoryBlockForTableSchema;
+ // mods entry
+ private final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications;
+
// Used to read tsfile data
private IChunkReader chunkReader;
private BatchData batchData;
@@ -96,6 +103,8 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
private List<TSDataType> dataTypeList;
private int deviceIdSize;
+ private List<ModsOperationUtil.ModsInfo> modsInfoList;
+
// Used to record whether the same Tablet is generated when parsing starts.
Different table
// information cannot be placed in the same Tablet.
private boolean isSameTableName;
@@ -109,12 +118,14 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
final PipeMemoryBlock allocatedMemoryBlockForChunk,
final PipeMemoryBlock allocatedMemoryBlockForChunkMeta,
final PipeMemoryBlock allocatedMemoryBlockForTableSchema,
+ final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications,
final long startTime,
final long endTime)
throws IOException {
this.startTime = startTime;
this.endTime = endTime;
+ this.modifications = modifications;
this.reader = tsFileSequenceReader;
this.metadataQuerier = new MetadataQuerierByFileImpl(reader);
@@ -202,6 +213,31 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
continue;
}
+ Iterator<IChunkMetadata> iChunkMetadataIterator =
+
alignedChunkMetadata.getValueChunkMetadataList().iterator();
+ while (iChunkMetadataIterator.hasNext()) {
+ IChunkMetadata iChunkMetadata =
iChunkMetadataIterator.next();
+ if (iChunkMetadata == null) {
+ iChunkMetadataIterator.remove();
+ continue;
+ }
+
+ if (!modifications.isEmpty()
+ && ModsOperationUtil.isAllDeletedByMods(
+ pair.getLeft(),
+ iChunkMetadata.getMeasurementUid(),
+ alignedChunkMetadata.getStartTime(),
+ alignedChunkMetadata.getEndTime(),
+ modifications)) {
+ iChunkMetadataIterator.remove();
+ }
+ }
+
+ if
(alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
+ chunkMetadataIterator.remove();
+ continue;
+ }
+
size +=
PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata);
if (allocatedMemoryBlockForChunkMeta.getMemoryUsageInBytes() <
size) {
@@ -307,9 +343,10 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
break;
}
- tablet.addTimestamp(rowIndex, batchData.currentTime());
- fillMeasurementValueColumns(batchData, tablet, rowIndex);
- fillDeviceIdColumns(deviceID, tablet, rowIndex);
+ if (fillMeasurementValueColumns(batchData, tablet, rowIndex)) {
+ fillDeviceIdColumns(deviceID, tablet, rowIndex);
+ tablet.addTimestamp(rowIndex, batchData.currentTime());
+ }
}
if (batchData != null) {
@@ -386,15 +423,19 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
}
this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
+ this.modsInfoList =
+ ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList,
modifications);
}
- private void fillMeasurementValueColumns(
+ private boolean fillMeasurementValueColumns(
final BatchData data, final Tablet tablet, final int rowIndex) {
final TsPrimitiveType[] primitiveTypes = data.getVector();
+ boolean needFillTime = false;
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
- if (primitiveType == null) {
+ if (primitiveType == null
+ || ModsOperationUtil.isDelete(data.currentTime(),
modsInfoList.get(i))) {
switch (dataTypeList.get(i)) {
case TEXT:
case BLOB:
@@ -404,6 +445,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
tablet.getBitMaps()[i].mark(rowIndex);
continue;
}
+ needFillTime = true;
switch (dataTypeList.get(i)) {
case BOOLEAN:
@@ -438,6 +480,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
}
}
+ return needFillTime;
}
private void fillDeviceIdColumns(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
new file mode 100644
index 00000000000..66fed43fead
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for handling mods operations during TsFile parsing. Supports
mods processing logic
+ * for both tree model and table model.
+ */
+public class ModsOperationUtil {
+
+ private ModsOperationUtil() {
+ // Utility class, no instantiation allowed
+ }
+
+ /**
+ * Load all modifications from TsFile and build PatternTreeMap
+ *
+ * @param tsFile TsFile file
+ * @return PatternTreeMap containing all modifications
+ */
+ public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+ loadModificationsFromTsFile(File tsFile) {
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+
+ try {
+ ModificationFile.readAllModifications(tsFile, true)
+ .forEach(
+ modification ->
modifications.append(modification.keyOfPatternTree(), modification));
+ } catch (Exception e) {
+ throw new PipeException("Failed to load modifications from TsFile: " +
tsFile.getPath(), e);
+ }
+
+ return modifications;
+ }
+
+ /**
+ * Check if data in the specified time range is completely deleted by mods
Different logic for
+ * tree model and table model
+ *
+ * @param deviceID device ID
+ * @param measurementID measurement ID
+ * @param startTime start time
+ * @param endTime end time
+ * @param modifications modification records
+ * @return true if data is completely deleted, false otherwise
+ */
+ public static boolean isAllDeletedByMods(
+ IDeviceID deviceID,
+ String measurementID,
+ long startTime,
+ long endTime,
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+ if (modifications == null) {
+ return false;
+ }
+
+ final List<ModEntry> mods = modifications.getOverlapped(deviceID,
measurementID);
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ // Different logic for tree model and table model
+ if (deviceID.isTableModel()) {
+ // For table model: check if any modification affects the device and
covers the time range
+ return mods.stream()
+ .anyMatch(
+ modification ->
+ modification.getTimeRange().contains(startTime, endTime)
+ && modification.affects(deviceID)
+ && modification.affects(measurementID));
+ } else {
+ // For tree model: check if any modification covers the time range
+ return mods.stream()
+ .anyMatch(modification ->
modification.getTimeRange().contains(startTime, endTime));
+ }
+ }
+
+ /**
+ * Initialize mods mapping for specified measurement list
+ *
+ * @param deviceID device ID
+ * @param measurements measurement list
+ * @param modifications modification records
+ * @return mapping from measurement ID to mods list and index
+ */
+ public static List<ModsInfo> initializeMeasurementMods(
+ IDeviceID deviceID,
+ List<String> measurements,
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+
+ List<ModsInfo> modsInfos = new ArrayList<>(measurements.size());
+
+ for (final String measurement : measurements) {
+ final List<ModEntry> mods = modifications.getOverlapped(deviceID,
measurement);
+ if (mods == null || mods.isEmpty()) {
+ // No mods, use empty list and index 0
+ modsInfos.add(new ModsInfo(Collections.emptyList(), 0));
+ continue;
+ }
+
+ // Sort by time range for efficient lookup
+ // Different filtering logic for tree model and table model
+ final List<ModEntry> filteredMods;
+ if (deviceID.isTableModel()) {
+ // For table model: filter modifications that affect the device
+ filteredMods =
+ mods.stream()
+ .filter(
+ modification ->
+ modification.affects(deviceID) &&
modification.affects(measurement))
+ .collect(Collectors.toList());
+ } else {
+ // For tree model: no additional filtering needed
+ filteredMods = mods;
+ }
+ // Store sorted mods and start index
+ modsInfos.add(new ModsInfo(ModificationUtils.sortAndMerge(filteredMods),
0));
+ }
+
+ return modsInfos;
+ }
+
+ /**
+ * Check if data at the specified time point is deleted
+ *
+ * @param time time point
+ * @param modsInfo mods information containing mods list and current index
+ * @return true if data is deleted, false otherwise
+ */
+ public static boolean isDelete(long time, ModsInfo modsInfo) {
+ if (modsInfo == null) {
+ return false;
+ }
+
+ final List<ModEntry> mods = modsInfo.getMods();
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ int currentIndex = modsInfo.getCurrentIndex();
+ if (currentIndex < 0) {
+ return false;
+ }
+
+ // First, try to use the current index if it's valid
+ if (currentIndex < mods.size()) {
+ final ModEntry currentMod = mods.get(currentIndex);
+ final long currentModStartTime = currentMod.getTimeRange().getMin();
+ final long currentModEndTime = currentMod.getTimeRange().getMax();
+
+ if (time < currentModStartTime) {
+ // Time is before current mod, return false
+ return false;
+ } else if (time <= currentModEndTime) {
+ // Time is within current mod range, return true
+ return true;
+ } else {
+ // Time is after current mod, need to search forwards
+ return searchAndCheckMod(mods, time, currentIndex + 1, modsInfo);
+ }
+ } else {
+ // Current index is beyond array bounds, all mods have been processed
+ clearModsAndReset(modsInfo);
+ return false;
+ }
+ }
+
+ /**
+ * Search for a mod using binary search and check if the time point is
deleted
+ *
+ * @param mods sorted list of mods
+ * @param time time point to search for
+ * @param startIndex starting index for search
+ * @param modsInfo mods information to update
+ * @return true if data is deleted, false otherwise
+ */
+ private static boolean searchAndCheckMod(
+ List<ModEntry> mods, long time, int startIndex, ModsInfo modsInfo) {
+ int searchIndex = binarySearchMods(mods, time, startIndex);
+ if (searchIndex >= mods.size()) {
+ // All mods checked, clear mods list and reset index to 0
+ clearModsAndReset(modsInfo);
+ return false;
+ }
+
+ final ModEntry foundMod = mods.get(searchIndex);
+ final long foundModStartTime = foundMod.getTimeRange().getMin();
+
+ if (time < foundModStartTime) {
+ modsInfo.setCurrentIndex(searchIndex);
+ return false;
+ }
+
+ modsInfo.setCurrentIndex(searchIndex);
+ return true;
+ }
+
+ /**
+ * Clear mods list and reset index to 0
+ *
+ * @param modsInfo mods information to update
+ */
+ private static void clearModsAndReset(ModsInfo modsInfo) {
+ modsInfo.setMods(Collections.emptyList());
+ modsInfo.setCurrentIndex(0);
+ }
+
+ /**
+ * Binary search to find the first mod that might contain the given time
point. Returns the index
+ * of the first mod where modStartTime <= time, or mods.size() if no such
mod exists.
+ *
+ * @param mods sorted list of mods
+ * @param time time point to search for
+ * @param startIndex starting index for search (current index)
+ * @return index of the first potential mod, or mods.size() if none found
+ */
+ private static int binarySearchMods(List<ModEntry> mods, long time, int
startIndex) {
+ int left = startIndex;
+ int right = mods.size();
+
+ while (left < right) {
+ int mid = left + (right - left) / 2;
+ final long max = mods.get(mid).getTimeRange().getMax();
+
+ if (max < time) {
+ left = mid + 1;
+ } else {
+ right = mid;
+ }
+ }
+
+ return left;
+ }
+
+ /** Mods information wrapper class, containing mods list and current index */
+ public static class ModsInfo {
+ private List<ModEntry> mods;
+ private int currentIndex;
+
+ public ModsInfo(List<ModEntry> mods, int currentIndex) {
+ this.mods = Objects.requireNonNull(mods);
+ this.currentIndex = currentIndex;
+ }
+
+ public List<ModEntry> getMods() {
+ return mods;
+ }
+
+ public void setMods(List<ModEntry> newMods) {
+ this.mods = newMods;
+ }
+
+ public int getCurrentIndex() {
+ return currentIndex;
+ }
+
+ public void setCurrentIndex(int newIndex) {
+ this.currentIndex = newIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ModsInfo modsInfo = (ModsInfo) o;
+ return Objects.equals(mods, modsInfo.mods)
+ && Objects.equals(currentIndex, modsInfo.currentIndex);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(mods, currentIndex);
+ }
+
+ @Override
+ public String toString() {
+ return "ModsInfo{" + "mods=" + mods + ", currentIndex=" + currentIndex +
'}';
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
index e68eac4c8a9..4bb1f8d2e17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
@@ -133,7 +133,8 @@ public class
PipeTableStatementDataTypeConvertExecutionVisitor
Long.MAX_VALUE,
null,
"root",
- null)) {
+ null,
+ true)) {
for (final TabletInsertionEvent tabletInsertionEvent :
parser.toTabletInsertionEvents()) {
if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
continue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index 4e5f4d8be1d..e9707299e09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -102,7 +102,7 @@ public class
PipeTreeStatementDataTypeConvertExecutionVisitor
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventScanParser parser =
new TsFileInsertionEventScanParser(
- file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null, true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
final PipeConvertedInsertTabletStatement statement =
new PipeConvertedInsertTabletStatement(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
index 1de90e2adbd..9a6be9737af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -86,7 +86,8 @@ public class LoadTableStatementDataTypeConvertExecutionVisitor
Long.MAX_VALUE,
null,
"root",
- null)) {
+ null,
+ true)) {
for (final TabletInsertionEvent tabletInsertionEvent :
parser.toTabletInsertionEvents()) {
if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
continue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index bb0ce1a3344..2999d436c95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -93,7 +93,13 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventScanParser parser =
new TsFileInsertionEventScanParser(
- file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ file,
+ new IoTDBTreePattern(null),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
final PipeTransferTabletRawReq tabletRawReq =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 9c58ae9f0f6..7bfde3b158d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -584,7 +584,7 @@ public class TsFileInsertionEventParserTest {
? new TsFileInsertionEventQueryParser(
tsFile, pattern, startTime, endTime, tsFileInsertionEvent)
: new TsFileInsertionEventScanParser(
- tsFile, pattern, startTime, endTime, null,
tsFileInsertionEvent)) {
+ tsFile, pattern, startTime, endTime, null,
tsFileInsertionEvent, false)) {
final AtomicInteger count1 = new AtomicInteger(0);
final AtomicInteger count2 = new AtomicInteger(0);
final AtomicInteger count3 = new AtomicInteger(0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
new file mode 100644
index 00000000000..7eb09bbab41
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
+
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ModsOperationUtilTest {
+
+ @Test
+ public void testIsDeleteWithBinarySearch() {
+ // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70,
80]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}, {70, 80}});
+
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
0);
+
+ // Test cases
+ // Time 5: before first mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(5, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 15: within first mod [10, 20], should return true
+ assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 25: between first and second mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(25, modsInfo));
+ assertEquals(1, modsInfo.getCurrentIndex());
+
+ // Time 35: within second mod [30, 40], should return true
+ assertTrue(ModsOperationUtil.isDelete(35, modsInfo));
+ assertEquals(1, modsInfo.getCurrentIndex());
+
+ // Time 45: between second and third mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(45, modsInfo));
+ assertEquals(2, modsInfo.getCurrentIndex());
+
+ // Time 55: within third mod [50, 60], should return true
+ assertTrue(ModsOperationUtil.isDelete(55, modsInfo));
+ assertEquals(2, modsInfo.getCurrentIndex());
+
+ // Time 65: between third and fourth mod, should return false
+ assertFalse(ModsOperationUtil.isDelete(65, modsInfo));
+ assertEquals(3, modsInfo.getCurrentIndex());
+
+ // Time 75: within fourth mod [70, 80], should return true
+ assertTrue(ModsOperationUtil.isDelete(75, modsInfo));
+ assertEquals(3, modsInfo.getCurrentIndex());
+
+ // Time 85: after last mod, should return false and clear mods
+ assertFalse(ModsOperationUtil.isDelete(85, modsInfo));
+ assertTrue(modsInfo.getMods().isEmpty());
+ assertEquals(0, modsInfo.getCurrentIndex());
+ }
+
+ @Test
+ public void testIsDeleteWithEmptyMods() {
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(new
ArrayList<>(), 0);
+
+ // Should return false for any time when mods is empty
+ assertFalse(ModsOperationUtil.isDelete(100, modsInfo));
+ }
+
+ @Test
+ public void testIsDeleteWithNullModsInfo() {
+ // Should return false when modsInfo is null
+ assertFalse(ModsOperationUtil.isDelete(100, null));
+ }
+
+ @Test
+ public void testIsDeleteWithNegativeIndex() {
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}});
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
-1);
+
+ // Should return false when currentIndex is negative
+ assertFalse(ModsOperationUtil.isDelete(15, modsInfo));
+ }
+
+ @Test
+ public void testIsDeleteWithSingleMod() {
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}});
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
0);
+
+ // Time before mod
+ assertFalse(ModsOperationUtil.isDelete(5, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time within mod
+ assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time after mod
+ assertFalse(ModsOperationUtil.isDelete(25, modsInfo));
+ assertTrue(modsInfo.getMods().isEmpty());
+ assertEquals(0, modsInfo.getCurrentIndex());
+ }
+
+ @Test
+ public void testIsDeleteWithOverlappingMods() {
+ // Create overlapping mods: [10, 30], [20, 40], [30, 50]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 30}, {20, 40},
{30, 50}});
+
+ ModsOperationUtil.ModsInfo modsInfo = new ModsOperationUtil.ModsInfo(mods,
0);
+
+ // Time 15: within first mod
+ assertTrue(ModsOperationUtil.isDelete(15, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 25: within both first and second mod, should find first one
+ assertTrue(ModsOperationUtil.isDelete(25, modsInfo));
+ assertEquals(0, modsInfo.getCurrentIndex());
+
+ // Time 35: within second and third mod, should find second one
+ assertTrue(ModsOperationUtil.isDelete(35, modsInfo));
+ assertEquals(1, modsInfo.getCurrentIndex());
+
+ // Time 45: within third mod
+ assertTrue(ModsOperationUtil.isDelete(45, modsInfo));
+ assertEquals(2, modsInfo.getCurrentIndex());
+ }
+
+ @Test
+ public void testBinarySearchMods() {
+ // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70,
80]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}, {70, 80}});
+
+ // Test binary search from start index 0
+ // Time 5: before all mods, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, 5, 0));
+
+ // Time 15: within first mod [10, 20], should return 0
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 25: between first and second mod, should return 1
+ assertEquals(1, binarySearchMods(mods, 25, 0));
+
+ // Time 35: within second mod [30, 40], should return 1
+ assertEquals(1, binarySearchMods(mods, 35, 0));
+
+ // Time 45: between second and third mod, should return 2
+ assertEquals(2, binarySearchMods(mods, 45, 0));
+
+ // Time 55: within third mod [50, 60], should return 2
+ assertEquals(2, binarySearchMods(mods, 55, 0));
+
+ // Time 65: between third and fourth mod, should return 3
+ assertEquals(3, binarySearchMods(mods, 65, 0));
+
+ // Time 75: within fourth mod [70, 80], should return 3
+ assertEquals(3, binarySearchMods(mods, 75, 0));
+
+ // Time 85: after all mods, should return 4 (mods.size())
+ assertEquals(4, binarySearchMods(mods, 85, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithStartIndex() {
+ // Create test mods with time ranges: [10, 20], [30, 40], [50, 60], [70,
80]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}, {70, 80}});
+
+ // Test binary search starting from index 2
+ // Time 15: before start index, should return 2 (start index)
+ assertEquals(2, binarySearchMods(mods, 15, 2));
+
+ // Time 35: before start index, should return 2 (start index)
+ assertEquals(2, binarySearchMods(mods, 35, 2));
+
+ // Time 55: within mod at index 2, should return 2
+ assertEquals(2, binarySearchMods(mods, 55, 2));
+
+ // Time 65: between mods at index 2 and 3, should return 3
+ assertEquals(3, binarySearchMods(mods, 65, 2));
+
+ // Time 75: within mod at index 3, should return 3
+ assertEquals(3, binarySearchMods(mods, 75, 2));
+
+ // Time 85: after all mods, should return 4 (mods.size())
+ assertEquals(4, binarySearchMods(mods, 85, 2));
+ }
+
+ @Test
+ public void testBinarySearchModsWithOverlappingRanges() {
+ // Create overlapping mods: [10, 30], [20, 40], [30, 50]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 30}, {20, 40},
{30, 50}});
+
+ // Time 15: within first mod, should return 0
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 25: within first and second mod, should return 0 (first match)
+ assertEquals(0, binarySearchMods(mods, 25, 0));
+
+ // Time 35: within second and third mod, should return 1 (first match from
start)
+ assertEquals(1, binarySearchMods(mods, 35, 0));
+
+ // Time 45: within third mod, should return 2
+ assertEquals(2, binarySearchMods(mods, 45, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithEmptyList() {
+ List<ModEntry> mods = new ArrayList<>();
+
+ // Should return 0 for any time when mods is empty
+ assertEquals(0, binarySearchMods(mods, 100, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithSingleMod() {
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}});
+
+ // Time 5: before mod, should return 0
+ assertEquals(0, binarySearchMods(mods, 5, 0));
+
+ // Time 15: within mod, should return 0
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 25: after mod, should return 1
+ assertEquals(1, binarySearchMods(mods, 25, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithExactBoundaries() {
+ // Create mods with exact boundaries: [10, 20], [20, 30], [30, 40]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {20, 30},
{30, 40}});
+
+ // Time 20: at boundary, first mod [10, 20] has endTime=20 >= 20, should
return 0
+ assertEquals(0, binarySearchMods(mods, 20, 0));
+
+ // Time 30: at boundary, second mod [20, 30] has endTime=30 >= 30, should
return 1
+ assertEquals(1, binarySearchMods(mods, 30, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithMinBoundaries() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 10: exactly at first mod's min, should return 0
+ assertEquals(0, binarySearchMods(mods, 10, 0));
+
+ // Time 30: exactly at second mod's min, should return 1
+ assertEquals(1, binarySearchMods(mods, 30, 0));
+
+ // Time 50: exactly at third mod's min, should return 2
+ assertEquals(2, binarySearchMods(mods, 50, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithMaxBoundaries() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 20: exactly at first mod's max, should return 0
+ assertEquals(0, binarySearchMods(mods, 20, 0));
+
+ // Time 40: exactly at second mod's max, should return 1
+ assertEquals(1, binarySearchMods(mods, 40, 0));
+
+ // Time 60: exactly at third mod's max, should return 2
+ assertEquals(2, binarySearchMods(mods, 60, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithJustBeforeMin() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 9: just before first mod's min, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, 9, 0));
+
+ // Time 29: just before second mod's min, should return 1 (second mod)
+ assertEquals(1, binarySearchMods(mods, 29, 0));
+
+ // Time 49: just before third mod's min, should return 2 (third mod)
+ assertEquals(2, binarySearchMods(mods, 49, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithJustAfterMax() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Time 21: just after first mod's max, should return 1 (second mod)
+ assertEquals(1, binarySearchMods(mods, 21, 0));
+
+ // Time 41: just after second mod's max, should return 2 (third mod)
+ assertEquals(2, binarySearchMods(mods, 41, 0));
+
+ // Time 61: just after third mod's max, should return 3 (mods.size())
+ assertEquals(3, binarySearchMods(mods, 61, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithLargeGaps() {
+ // Create mods with large gaps: [10, 20], [100, 200], [1000, 2000]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {100, 200},
{1000, 2000}});
+
+ // Time 50: in large gap, should return 1 (second mod)
+ assertEquals(1, binarySearchMods(mods, 50, 0));
+
+ // Time 500: in large gap, should return 2 (third mod)
+ assertEquals(2, binarySearchMods(mods, 500, 0));
+
+ // Time 5000: after all mods, should return 3 (mods.size())
+ assertEquals(3, binarySearchMods(mods, 5000, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithNegativeTime() {
+ // Create mods: [10, 20], [30, 40]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40}});
+
+ // Time -10: negative time, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, -10, 0));
+
+ // Time 0: zero time, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, 0, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithVeryLargeTime() {
+ // Create mods: [10, 20], [30, 40]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40}});
+
+ // Time Long.MAX_VALUE: very large time, should return 2 (mods.size())
+ assertEquals(2, binarySearchMods(mods, Long.MAX_VALUE, 0));
+
+ // Time Long.MIN_VALUE: very small time, should return 0 (first mod)
+ assertEquals(0, binarySearchMods(mods, Long.MIN_VALUE, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithDuplicateTimeRanges() {
+ // Create mods with duplicate time ranges: [10, 20], [10, 20], [30, 40]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {10, 20},
{30, 40}});
+
+ // Time 15: within duplicate ranges, should return 0 (first match)
+ assertEquals(0, binarySearchMods(mods, 15, 0));
+
+ // Time 20: at max of duplicate ranges, should return 0 (first match)
+ assertEquals(0, binarySearchMods(mods, 20, 0));
+ }
+
+ @Test
+ public void testBinarySearchModsWithStartIndexAtEnd() {
+ // Create mods: [10, 20], [30, 40], [50, 60]
+ List<ModEntry> mods = createTestMods(new long[][] {{10, 20}, {30, 40},
{50, 60}});
+
+ // Start from index 3 (beyond array), should return 3
+ assertEquals(3, binarySearchMods(mods, 100, 3));
+
+ // Start from index 2, search for time 55, should return 2
+ assertEquals(2, binarySearchMods(mods, 55, 2));
+
+ // Start from index 2, search for time 25, should return 2 (start index)
+ assertEquals(2, binarySearchMods(mods, 25, 2));
+ }
+
+ // Helper method to access the private binarySearchMods method for testing
+ private int binarySearchMods(List<ModEntry> mods, long time, int startIndex)
{
+ // Use reflection to access the private method
+ try {
+ java.lang.reflect.Method method =
+ ModsOperationUtil.class.getDeclaredMethod(
+ "binarySearchMods", List.class, long.class, int.class);
+ method.setAccessible(true);
+ return (Integer) method.invoke(null, mods, time, startIndex);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke binarySearchMods method",
e);
+ }
+ }
+
+ private List<ModEntry> createTestMods(long[][] timeRanges) {
+ List<ModEntry> mods = new ArrayList<>();
+ for (long[] range : timeRanges) {
+ TreeDeletionEntry mod = new TreeDeletionEntry(null, new
TimeRange(range[0], range[1]));
+ mods.add(mod);
+ }
+ return mods;
+ }
+}