This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion2 by this 
push:
     new 2035f47d577 fix column category serialization & bitmap after swap & 
flush & flush updates & deviceId conversion
2035f47d577 is described below

commit 2035f47d5779b6ce6cada49b5ffff0088af7cc98
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 22 21:21:37 2024 +0800

    fix column category serialization & bitmap after swap & flush & flush 
updates & deviceId conversion
---
 .../iotdb/session/it/IoTDBSessionRelationalIT.java |  2 +-
 .../planner/plan/node/write/InsertTabletNode.java  |  6 +++-
 .../plan/node/write/RelationalInsertRowNode.java   | 22 ++++++++-----
 .../node/write/RelationalInsertTabletNode.java     | 36 ++++++++++++++--------
 .../plan/statement/crud/InsertTabletStatement.java |  8 ++++-
 .../iotdb/db/storageengine/StorageEngine.java      |  2 +-
 .../dataregion/memtable/TsFileProcessor.java       |  9 ++++--
 7 files changed, 57 insertions(+), 28 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index 1fcae69a1e7..d91db30723a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -80,7 +80,7 @@ public class IoTDBSessionRelationalIT {
 
       session.executeNonQueryStatement("USE \"db1\"");
       // only one column in this table, and others should be auto-created
-      session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS table1 (id1 
string id)");
+      session.executeNonQueryStatement("CREATE TABLE table1 (id1 string id)");
 
       List<IMeasurementSchema> schemaList = new ArrayList<>();
       schemaList.add(new MeasurementSchema("id2", TSDataType.STRING));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index be5e26f7858..4f46c6fda54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -631,7 +631,11 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
       case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(binaryValues[j], buffer);
+          if (binaryValues[j] != null) {
+            ReadWriteIOUtils.write(binaryValues[j], buffer);
+          } else {
+            ReadWriteIOUtils.write(0, buffer);
+          }
         }
         break;
       default:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index 97dd3d3673e..df9f9174ce9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -181,31 +181,37 @@ public class RelationalInsertRowNode extends 
InsertRowNode {
   @Override
   void subSerialize(ByteBuffer buffer) {
     super.subSerialize(buffer);
-    for (int i = 0; i < dataTypes.length; i++) {
-      columnCategories[i].serialize(buffer);
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] != null) {
+        columnCategories[i].serialize(buffer);
+      }
     }
   }
 
   @Override
   void subSerialize(DataOutputStream stream) throws IOException {
     super.subSerialize(stream);
-    for (int i = 0; i < dataTypes.length; i++) {
-      columnCategories[i].serialize(stream);
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] != null) {
+        columnCategories[i].serialize(stream);
+      }
     }
   }
 
   @Override
   protected void subSerialize(IWALByteBufferView buffer) {
     super.subSerialize(buffer);
-    for (int i = 0; i < dataTypes.length; i++) {
-      buffer.put(columnCategories[i].getCategory());
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] != null) {
+        buffer.put(columnCategories[i].getCategory());
+      }
     }
   }
 
   public void subDeserialize(ByteBuffer buffer) {
     super.subDeserialize(buffer);
-    TsTableColumnCategory[] newColumnCategories = new 
TsTableColumnCategory[dataTypes.length];
-    for (int i = 0; i < dataTypes.length; i++) {
+    TsTableColumnCategory[] newColumnCategories = new 
TsTableColumnCategory[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
       newColumnCategories[i] = TsTableColumnCategory.deserialize(buffer);
     }
     setColumnCategories(newColumnCategories);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 6d0b6cccbb4..3902c9ae9b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -103,7 +103,11 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
       for (int i = 0; i < idColumnIndices.size(); i++) {
         final Integer columnIndex = idColumnIndices.get(i);
         Object idSeg = ((Object[]) columns[columnIndex])[rowIdx];
-        deviceIdSegments[i + 1] = idSeg != null ? idSeg.toString() : null;
+        boolean isNull =
+            bitMaps != null
+                && bitMaps[columnIndex] != null
+                && bitMaps[columnIndex].isMarked(rowIdx);
+        deviceIdSegments[i + 1] = !isNull && idSeg != null ? idSeg.toString() 
: null;
       }
       deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
     }
@@ -144,23 +148,27 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     super.serializeAttributes(byteBuffer);
-    for (int i = 0; i < dataTypes.length; i++) {
-      columnCategories[i].serialize(byteBuffer);
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] != null) {
+        columnCategories[i].serialize(byteBuffer);
+      }
     }
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
     super.serializeAttributes(stream);
-    for (int i = 0; i < dataTypes.length; i++) {
-      columnCategories[i].serialize(stream);
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] != null) {
+        columnCategories[i].serialize(stream);
+      }
     }
   }
 
   public void subDeserialize(ByteBuffer buffer) {
     super.subDeserialize(buffer);
-    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[dataTypes.length];
-    for (int i = 0; i < dataTypes.length; i++) {
+    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
       columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
     }
     setColumnCategories(columnCategories);
@@ -168,16 +176,18 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
   void subSerialize(IWALByteBufferView buffer, int start, int end) {
     super.subSerialize(buffer, start, end);
-    for (int i = 0; i < dataTypes.length; i++) {
-      buffer.put(columnCategories[i].getCategory());
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] != null) {
+        buffer.put(columnCategories[i].getCategory());
+      }
     }
   }
 
   @Override
   protected void subDeserializeFromWAL(ByteBuffer buffer) {
     super.subDeserializeFromWAL(buffer);
-    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[dataTypes.length];
-    for (int i = 0; i < dataTypes.length; i++) {
+    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
       columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
     }
     setColumnCategories(columnCategories);
@@ -186,8 +196,8 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
   @Override
   protected void subDeserializeFromWAL(DataInputStream stream) throws 
IOException {
     super.subDeserializeFromWAL(stream);
-    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[dataTypes.length];
-    for (int i = 0; i < dataTypes.length; i++) {
+    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
       columnCategories[i] = TsTableColumnCategory.deserialize(stream);
     }
     setColumnCategories(columnCategories);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index ce3fb28f913..3bfe7d8fc33 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -466,7 +466,13 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
   public void insertColumn(int pos, ColumnSchema columnSchema) {
     super.insertColumn(pos, columnSchema);
 
-    if (bitMaps != null) {
+    if (bitMaps == null) {
+      bitMaps = new BitMap[measurements.length];
+      bitMaps[pos] = new BitMap(rowCount);
+      for (int i = 0; i < rowCount; i++) {
+        bitMaps[pos].mark(i);
+      }
+    } else {
       BitMap[] tmpBitmaps = new BitMap[bitMaps.length + 1];
       System.arraycopy(bitMaps, 0, tmpBitmaps, 0, pos);
       tmpBitmaps[pos] = new BitMap(rowCount);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 8758af3aa00..5035b389279 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -578,7 +578,7 @@ public class StorageEngine implements IService {
   }
 
   public void operateFlush(TFlushReq req) {
-    if (req.storageGroups == null) {
+    if (req.storageGroups == null || req.storageGroups.isEmpty()) {
       StorageEngine.getInstance().syncCloseAllProcessor();
       WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 951f655c5e4..169428dbeb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -568,15 +568,18 @@ public class TsFileProcessor {
       // For sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
       // For unsequence tsfile, we have to update the endTime for each 
insertion.
       tsFileResource.updateEndTime(
-          deviceEndOffsetPairs.get(0).left, deviceEndOffsetPairs.get(0).right);
+          deviceEndOffsetPairs.get(0).left,
+          insertTabletNode.getTimes()[deviceEndOffsetPairs.get(0).right - 1]);
     }
     for (int i = 1; i < deviceEndOffsetPairs.size(); i++) {
       // the end offset of i - 1 is the start offset of i
       tsFileResource.updateStartTime(
-          deviceEndOffsetPairs.get(i).left, deviceEndOffsetPairs.get(i - 
1).right);
+          deviceEndOffsetPairs.get(i).left,
+          insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i - 1).right]);
       if (!sequence) {
         tsFileResource.updateEndTime(
-            deviceEndOffsetPairs.get(i).left, 
deviceEndOffsetPairs.get(i).right);
+            deviceEndOffsetPairs.get(i).left,
+            insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i).right - 
1]);
       }
     }
 

Reply via email to