This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 273ade23501 Fix aligned timeseries may loss point after flush
273ade23501 is described below

commit 273ade23501f7fb10665c0fab03d5960289b4db0
Author: Haonan <[email protected]>
AuthorDate: Tue Oct 24 12:18:16 2023 +0800

    Fix aligned timeseries may loss point after flush
---
 .../db/it/aligned/IoTDBInsertAlignedValues2IT.java | 42 ++++++++++++++++++++++
 .../memtable/AlignedWritableMemChunk.java          | 19 +++++-----
 2 files changed, 53 insertions(+), 8 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
index 331bd1cf3ff..530e0189c52 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
@@ -35,6 +35,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class, ClusterIT.class})
@@ -125,4 +127,44 @@ public class IoTDBInsertAlignedValues2IT {
       statement.execute("insert into root.sg.d1(time, s1,s2) aligned 
values(1,'aa','bb')");
     }
   }
+
+  @Test
+  public void testInsertComplexAlignedValues() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.addBatch("create aligned timeseries root.sg.d1(s1 int32, s2 
int32, s3 int32)");
+      statement.addBatch("insert into root.sg.d1(time,s1) values(3,1)");
+      statement.addBatch("insert into root.sg.d1(time,s1) values(1,1)");
+      statement.addBatch("insert into root.sg.d1(time,s1) values(2,1)");
+      statement.addBatch("insert into root.sg.d1(time,s2) values(2,2)");
+      statement.addBatch("insert into root.sg.d1(time,s2) values(1,2)");
+      statement.addBatch("insert into root.sg.d1(time,s2) values(3,2)");
+      statement.addBatch("insert into root.sg.d1(time,s3) values(1,3)");
+      statement.addBatch("insert into root.sg.d1(time,s3) values(3,3)");
+      statement.executeBatch();
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select count(s1), count(s2), count(s3) from 
root.sg.d1")) {
+
+        assertTrue(resultSet.next());
+        assertEquals(3, resultSet.getInt(1));
+        assertEquals(3, resultSet.getInt(2));
+        assertEquals(2, resultSet.getInt(3));
+
+        assertFalse(resultSet.next());
+      }
+
+      statement.execute("flush");
+      try (ResultSet resultSet =
+          statement.executeQuery("select count(s1), count(s2), count(s3) from 
root.sg.d1")) {
+
+        assertTrue(resultSet.next());
+        assertEquals(3, resultSet.getInt(1));
+        assertEquals(3, resultSet.getInt(2));
+        assertEquals(2, resultSet.getInt(3));
+
+        assertFalse(resultSet.next());
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index a80a40e1995..2c405dd3dd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -358,12 +358,14 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     }
 
     List<TSDataType> dataTypes = list.getTsDataTypes();
+    Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new 
Pair[dataTypes.size()];
+
     for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
       for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) 
{
         // Pair of Time and Index
-        Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
-        if (Objects.nonNull(timeDuplicateInfo)) {
-          lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, 
null);
+        if (Objects.nonNull(timeDuplicateInfo)
+            && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
+          lastValidPointIndexForTimeDupCheck[columnIndex] = new 
Pair<>(Long.MIN_VALUE, null);
         }
         for (int sortedRowIndex = pageRange.get(pageNum * 2);
             sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
@@ -374,8 +376,9 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
           long time = list.getTime(sortedRowIndex);
           if (Objects.nonNull(timeDuplicateInfo)) {
             if (!list.isNullValue(list.getValueIndex(sortedRowIndex), 
columnIndex)) {
-              lastValidPointIndexForTimeDupCheck.left = time;
-              lastValidPointIndexForTimeDupCheck.right = 
list.getValueIndex(sortedRowIndex);
+              lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
+              lastValidPointIndexForTimeDupCheck[columnIndex].right =
+                  list.getValueIndex(sortedRowIndex);
             }
             if (timeDuplicateInfo[sortedRowIndex]) {
               if (!list.isNullValue(sortedRowIndex, columnIndex)) {
@@ -402,9 +405,9 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
           // write(T:3,V:null)
 
           int originRowIndex;
-          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
-              && (time == lastValidPointIndexForTimeDupCheck.left)) {
-            originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
+              && (time == 
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
+            originRowIndex = 
lastValidPointIndexForTimeDupCheck[columnIndex].right;
           } else {
             originRowIndex = list.getValueIndex(sortedRowIndex);
           }

Reply via email to