This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_aligned_timeseries_point_loss in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 759e2d2cb7577f735233f9b1a72f8d2f7b69f116 Author: HTHou <[email protected]> AuthorDate: Fri Oct 20 18:02:28 2023 +0800 Fix aligned timeseries may loss point after flush --- .../db/it/aligned/IoTDBInsertAlignedValuesIT.java | 41 ++++++++++++++++++++++ .../memtable/AlignedWritableMemChunk.java | 19 +++++----- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java index 786a67d9164..f31bd6c7cea 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java @@ -48,6 +48,7 @@ public class IoTDBInsertAlignedValuesIT { @Before public void setUp() throws Exception { EnvFactory.getEnv().getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + EnvFactory.getEnv().getConfig().getCommonConfig().setMaxNumberOfPointsInPage(2); EnvFactory.getEnv().initClusterEnvironment(); } @@ -248,6 +249,46 @@ public class IoTDBInsertAlignedValuesIT { } } + @Test + public void testInsertComplexAlignedValues() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.addBatch("create aligned timeseries root.sg.d1(s1 int32, s2 int32, s3 int32)"); + statement.addBatch("insert into root.sg.d1(time,s1) values(3,1)"); + statement.addBatch("insert into root.sg.d1(time,s1) values(1,1)"); + statement.addBatch("insert into root.sg.d1(time,s1) values(2,1)"); + statement.addBatch("insert into root.sg.d1(time,s2) values(2,2)"); + statement.addBatch("insert into root.sg.d1(time,s2) values(1,2)"); + statement.addBatch("insert into root.sg.d1(time,s2) values(3,2)"); + statement.addBatch("insert into root.sg.d1(time,s3) values(1,3)"); + statement.addBatch("insert into root.sg.d1(time,s3) values(3,3)"); + statement.executeBatch(); + + try (ResultSet resultSet = + statement.executeQuery("select count(s1), count(s2), count(s3) from root.sg.d1")) { + + assertTrue(resultSet.next()); + assertEquals(3, resultSet.getInt(1)); + assertEquals(3, resultSet.getInt(2)); + assertEquals(2, resultSet.getInt(3)); + + assertFalse(resultSet.next()); + } + + statement.execute("flush"); + try (ResultSet resultSet = + statement.executeQuery("select count(s1), count(s2), count(s3) from root.sg.d1")) { + + assertTrue(resultSet.next()); + assertEquals(3, resultSet.getInt(1)); + assertEquals(3, resultSet.getInt(2)); + assertEquals(2, resultSet.getInt(3)); + + assertFalse(resultSet.next()); + } + } + } + @Test public void testInsertWithWrongMeasurementNum1() throws SQLException { try (Connection connection = EnvFactory.getEnv().getConnection(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index a80a40e1995..2c405dd3dd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -358,12 +358,14 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } List<TSDataType> dataTypes = list.getTsDataTypes(); + Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; + for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) { for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) { // Pair of Time and Index - Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null; - if (Objects.nonNull(timeDuplicateInfo)) { - lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null); + if (Objects.nonNull(timeDuplicateInfo) + && lastValidPointIndexForTimeDupCheck[columnIndex] == null) { + lastValidPointIndexForTimeDupCheck[columnIndex] = new Pair<>(Long.MIN_VALUE, null); } for (int sortedRowIndex = pageRange.get(pageNum * 2); sortedRowIndex <= pageRange.get(pageNum * 2 + 1); @@ -374,8 +376,9 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { long time = list.getTime(sortedRowIndex); if (Objects.nonNull(timeDuplicateInfo)) { if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) { - lastValidPointIndexForTimeDupCheck.left = time; - lastValidPointIndexForTimeDupCheck.right = list.getValueIndex(sortedRowIndex); + lastValidPointIndexForTimeDupCheck[columnIndex].left = time; + lastValidPointIndexForTimeDupCheck[columnIndex].right = + list.getValueIndex(sortedRowIndex); } if (timeDuplicateInfo[sortedRowIndex]) { if (!list.isNullValue(sortedRowIndex, columnIndex)) { @@ -402,9 +405,9 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { // write(T:3,V:null) int originRowIndex; - if (Objects.nonNull(lastValidPointIndexForTimeDupCheck) - && (time == lastValidPointIndexForTimeDupCheck.left)) { - originRowIndex = lastValidPointIndexForTimeDupCheck.right; + if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex]) + && (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) { + originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right; } else { originRowIndex = list.getValueIndex(sortedRowIndex); }
