This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 273ade23501 Fix aligned timeseries may loss point after flush
273ade23501 is described below
commit 273ade23501f7fb10665c0fab03d5960289b4db0
Author: Haonan <[email protected]>
AuthorDate: Tue Oct 24 12:18:16 2023 +0800
Fix aligned timeseries may loss point after flush
---
.../db/it/aligned/IoTDBInsertAlignedValues2IT.java | 42 ++++++++++++++++++++++
.../memtable/AlignedWritableMemChunk.java | 19 +++++-----
2 files changed, 53 insertions(+), 8 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
index 331bd1cf3ff..530e0189c52 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValues2IT.java
@@ -35,6 +35,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
@@ -125,4 +127,44 @@ public class IoTDBInsertAlignedValues2IT {
statement.execute("insert into root.sg.d1(time, s1,s2) aligned
values(1,'aa','bb')");
}
}
+
+ @Test
+ public void testInsertComplexAlignedValues() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.addBatch("create aligned timeseries root.sg.d1(s1 int32, s2
int32, s3 int32)");
+ statement.addBatch("insert into root.sg.d1(time,s1) values(3,1)");
+ statement.addBatch("insert into root.sg.d1(time,s1) values(1,1)");
+ statement.addBatch("insert into root.sg.d1(time,s1) values(2,1)");
+ statement.addBatch("insert into root.sg.d1(time,s2) values(2,2)");
+ statement.addBatch("insert into root.sg.d1(time,s2) values(1,2)");
+ statement.addBatch("insert into root.sg.d1(time,s2) values(3,2)");
+ statement.addBatch("insert into root.sg.d1(time,s3) values(1,3)");
+ statement.addBatch("insert into root.sg.d1(time,s3) values(3,3)");
+ statement.executeBatch();
+
+ try (ResultSet resultSet =
+ statement.executeQuery("select count(s1), count(s2), count(s3) from
root.sg.d1")) {
+
+ assertTrue(resultSet.next());
+ assertEquals(3, resultSet.getInt(1));
+ assertEquals(3, resultSet.getInt(2));
+ assertEquals(2, resultSet.getInt(3));
+
+ assertFalse(resultSet.next());
+ }
+
+ statement.execute("flush");
+ try (ResultSet resultSet =
+ statement.executeQuery("select count(s1), count(s2), count(s3) from
root.sg.d1")) {
+
+ assertTrue(resultSet.next());
+ assertEquals(3, resultSet.getInt(1));
+ assertEquals(3, resultSet.getInt(2));
+ assertEquals(2, resultSet.getInt(3));
+
+ assertFalse(resultSet.next());
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index a80a40e1995..2c405dd3dd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -358,12 +358,14 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
List<TSDataType> dataTypes = list.getTsDataTypes();
+ Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new
Pair[dataTypes.size()];
+
for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++)
{
// Pair of Time and Index
- Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
- if (Objects.nonNull(timeDuplicateInfo)) {
- lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE,
null);
+ if (Objects.nonNull(timeDuplicateInfo)
+ && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
+ lastValidPointIndexForTimeDupCheck[columnIndex] = new
Pair<>(Long.MIN_VALUE, null);
}
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
@@ -374,8 +376,9 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
long time = list.getTime(sortedRowIndex);
if (Objects.nonNull(timeDuplicateInfo)) {
if (!list.isNullValue(list.getValueIndex(sortedRowIndex),
columnIndex)) {
- lastValidPointIndexForTimeDupCheck.left = time;
- lastValidPointIndexForTimeDupCheck.right =
list.getValueIndex(sortedRowIndex);
+ lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
+ lastValidPointIndexForTimeDupCheck[columnIndex].right =
+ list.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
if (!list.isNullValue(sortedRowIndex, columnIndex)) {
@@ -402,9 +405,9 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
// write(T:3,V:null)
int originRowIndex;
- if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
- && (time == lastValidPointIndexForTimeDupCheck.left)) {
- originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+ if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
+ && (time ==
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
+ originRowIndex =
lastValidPointIndexForTimeDupCheck[columnIndex].right;
} else {
originRowIndex = list.getValueIndex(sortedRowIndex);
}