This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_aligned_timeseries_point_loss
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 759e2d2cb7577f735233f9b1a72f8d2f7b69f116
Author: HTHou <[email protected]>
AuthorDate: Fri Oct 20 18:02:28 2023 +0800

    Fix aligned timeseries may loss point after flush
---
 .../db/it/aligned/IoTDBInsertAlignedValuesIT.java  | 41 ++++++++++++++++++++++
 .../memtable/AlignedWritableMemChunk.java          | 19 +++++-----
 2 files changed, 52 insertions(+), 8 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
index 786a67d9164..f31bd6c7cea 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
@@ -48,6 +48,7 @@ public class IoTDBInsertAlignedValuesIT {
   @Before
   public void setUp() throws Exception {
     
EnvFactory.getEnv().getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxNumberOfPointsInPage(2);
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
@@ -248,6 +249,46 @@ public class IoTDBInsertAlignedValuesIT {
     }
   }
 
+  @Test
+  public void testInsertComplexAlignedValues() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.addBatch("create aligned timeseries root.sg.d1(s1 int32, s2 
int32, s3 int32)");
+      statement.addBatch("insert into root.sg.d1(time,s1) values(3,1)");
+      statement.addBatch("insert into root.sg.d1(time,s1) values(1,1)");
+      statement.addBatch("insert into root.sg.d1(time,s1) values(2,1)");
+      statement.addBatch("insert into root.sg.d1(time,s2) values(2,2)");
+      statement.addBatch("insert into root.sg.d1(time,s2) values(1,2)");
+      statement.addBatch("insert into root.sg.d1(time,s2) values(3,2)");
+      statement.addBatch("insert into root.sg.d1(time,s3) values(1,3)");
+      statement.addBatch("insert into root.sg.d1(time,s3) values(3,3)");
+      statement.executeBatch();
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select count(s1), count(s2), count(s3) from 
root.sg.d1")) {
+
+        assertTrue(resultSet.next());
+        assertEquals(3, resultSet.getInt(1));
+        assertEquals(3, resultSet.getInt(2));
+        assertEquals(2, resultSet.getInt(3));
+
+        assertFalse(resultSet.next());
+      }
+
+      statement.execute("flush");
+      try (ResultSet resultSet =
+          statement.executeQuery("select count(s1), count(s2), count(s3) from 
root.sg.d1")) {
+
+        assertTrue(resultSet.next());
+        assertEquals(3, resultSet.getInt(1));
+        assertEquals(3, resultSet.getInt(2));
+        assertEquals(2, resultSet.getInt(3));
+
+        assertFalse(resultSet.next());
+      }
+    }
+  }
+
   @Test
   public void testInsertWithWrongMeasurementNum1() throws SQLException {
     try (Connection connection = EnvFactory.getEnv().getConnection();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index a80a40e1995..2c405dd3dd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -358,12 +358,14 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     }
 
     List<TSDataType> dataTypes = list.getTsDataTypes();
+    Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new 
Pair[dataTypes.size()];
+
     for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
       for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) 
{
         // Pair of Time and Index
-        Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
-        if (Objects.nonNull(timeDuplicateInfo)) {
-          lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, 
null);
+        if (Objects.nonNull(timeDuplicateInfo)
+            && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
+          lastValidPointIndexForTimeDupCheck[columnIndex] = new 
Pair<>(Long.MIN_VALUE, null);
         }
         for (int sortedRowIndex = pageRange.get(pageNum * 2);
             sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
@@ -374,8 +376,9 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
           long time = list.getTime(sortedRowIndex);
           if (Objects.nonNull(timeDuplicateInfo)) {
             if (!list.isNullValue(list.getValueIndex(sortedRowIndex), 
columnIndex)) {
-              lastValidPointIndexForTimeDupCheck.left = time;
-              lastValidPointIndexForTimeDupCheck.right = 
list.getValueIndex(sortedRowIndex);
+              lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
+              lastValidPointIndexForTimeDupCheck[columnIndex].right =
+                  list.getValueIndex(sortedRowIndex);
             }
             if (timeDuplicateInfo[sortedRowIndex]) {
               if (!list.isNullValue(sortedRowIndex, columnIndex)) {
@@ -402,9 +405,9 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
           // write(T:3,V:null)
 
           int originRowIndex;
-          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
-              && (time == lastValidPointIndexForTimeDupCheck.left)) {
-            originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
+              && (time == 
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
+            originRowIndex = 
lastValidPointIndexForTimeDupCheck[columnIndex].right;
           } else {
             originRowIndex = list.getValueIndex(sortedRowIndex);
           }

Reply via email to