This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion2 by this
push:
new 2035f47d577 fix column category serialization & bitmap after swap &
flush & flush updates & deviceId conversion
2035f47d577 is described below
commit 2035f47d5779b6ce6cada49b5ffff0088af7cc98
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 22 21:21:37 2024 +0800
fix column category serialization & bitmap after swap & flush & flush
updates & deviceId conversion
---
.../iotdb/session/it/IoTDBSessionRelationalIT.java | 2 +-
.../planner/plan/node/write/InsertTabletNode.java | 6 +++-
.../plan/node/write/RelationalInsertRowNode.java | 22 ++++++++-----
.../node/write/RelationalInsertTabletNode.java | 36 ++++++++++++++--------
.../plan/statement/crud/InsertTabletStatement.java | 8 ++++-
.../iotdb/db/storageengine/StorageEngine.java | 2 +-
.../dataregion/memtable/TsFileProcessor.java | 9 ++++--
7 files changed, 57 insertions(+), 28 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index 1fcae69a1e7..d91db30723a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -80,7 +80,7 @@ public class IoTDBSessionRelationalIT {
session.executeNonQueryStatement("USE \"db1\"");
// only one column in this table, and others should be auto-created
- session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS table1 (id1
string id)");
+ session.executeNonQueryStatement("CREATE TABLE table1 (id1 string id)");
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("id2", TSDataType.STRING));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index be5e26f7858..4f46c6fda54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -631,7 +631,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(binaryValues[j], buffer);
+ if (binaryValues[j] != null) {
+ ReadWriteIOUtils.write(binaryValues[j], buffer);
+ } else {
+ ReadWriteIOUtils.write(0, buffer);
+ }
}
break;
default:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index 97dd3d3673e..df9f9174ce9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -181,31 +181,37 @@ public class RelationalInsertRowNode extends
InsertRowNode {
@Override
void subSerialize(ByteBuffer buffer) {
super.subSerialize(buffer);
- for (int i = 0; i < dataTypes.length; i++) {
- columnCategories[i].serialize(buffer);
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ columnCategories[i].serialize(buffer);
+ }
}
}
@Override
void subSerialize(DataOutputStream stream) throws IOException {
super.subSerialize(stream);
- for (int i = 0; i < dataTypes.length; i++) {
- columnCategories[i].serialize(stream);
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ columnCategories[i].serialize(stream);
+ }
}
}
@Override
protected void subSerialize(IWALByteBufferView buffer) {
super.subSerialize(buffer);
- for (int i = 0; i < dataTypes.length; i++) {
- buffer.put(columnCategories[i].getCategory());
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ buffer.put(columnCategories[i].getCategory());
+ }
}
}
public void subDeserialize(ByteBuffer buffer) {
super.subDeserialize(buffer);
- TsTableColumnCategory[] newColumnCategories = new
TsTableColumnCategory[dataTypes.length];
- for (int i = 0; i < dataTypes.length; i++) {
+ TsTableColumnCategory[] newColumnCategories = new
TsTableColumnCategory[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
newColumnCategories[i] = TsTableColumnCategory.deserialize(buffer);
}
setColumnCategories(newColumnCategories);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 6d0b6cccbb4..3902c9ae9b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -103,7 +103,11 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
for (int i = 0; i < idColumnIndices.size(); i++) {
final Integer columnIndex = idColumnIndices.get(i);
Object idSeg = ((Object[]) columns[columnIndex])[rowIdx];
- deviceIdSegments[i + 1] = idSeg != null ? idSeg.toString() : null;
+ boolean isNull =
+ bitMaps != null
+ && bitMaps[columnIndex] != null
+ && bitMaps[columnIndex].isMarked(rowIdx);
+ deviceIdSegments[i + 1] = !isNull && idSeg != null ? idSeg.toString()
: null;
}
deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
}
@@ -144,23 +148,27 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
super.serializeAttributes(byteBuffer);
- for (int i = 0; i < dataTypes.length; i++) {
- columnCategories[i].serialize(byteBuffer);
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ columnCategories[i].serialize(byteBuffer);
+ }
}
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
super.serializeAttributes(stream);
- for (int i = 0; i < dataTypes.length; i++) {
- columnCategories[i].serialize(stream);
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ columnCategories[i].serialize(stream);
+ }
}
}
public void subDeserialize(ByteBuffer buffer) {
super.subDeserialize(buffer);
- TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[dataTypes.length];
- for (int i = 0; i < dataTypes.length; i++) {
+ TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
}
setColumnCategories(columnCategories);
@@ -168,16 +176,18 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
void subSerialize(IWALByteBufferView buffer, int start, int end) {
super.subSerialize(buffer, start, end);
- for (int i = 0; i < dataTypes.length; i++) {
- buffer.put(columnCategories[i].getCategory());
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ buffer.put(columnCategories[i].getCategory());
+ }
}
}
@Override
protected void subDeserializeFromWAL(ByteBuffer buffer) {
super.subDeserializeFromWAL(buffer);
- TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[dataTypes.length];
- for (int i = 0; i < dataTypes.length; i++) {
+ TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
}
setColumnCategories(columnCategories);
@@ -186,8 +196,8 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
@Override
protected void subDeserializeFromWAL(DataInputStream stream) throws
IOException {
super.subDeserializeFromWAL(stream);
- TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[dataTypes.length];
- for (int i = 0; i < dataTypes.length; i++) {
+ TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
columnCategories[i] = TsTableColumnCategory.deserialize(stream);
}
setColumnCategories(columnCategories);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index ce3fb28f913..3bfe7d8fc33 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -466,7 +466,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void insertColumn(int pos, ColumnSchema columnSchema) {
super.insertColumn(pos, columnSchema);
- if (bitMaps != null) {
+ if (bitMaps == null) {
+ bitMaps = new BitMap[measurements.length];
+ bitMaps[pos] = new BitMap(rowCount);
+ for (int i = 0; i < rowCount; i++) {
+ bitMaps[pos].mark(i);
+ }
+ } else {
BitMap[] tmpBitmaps = new BitMap[bitMaps.length + 1];
System.arraycopy(bitMaps, 0, tmpBitmaps, 0, pos);
tmpBitmaps[pos] = new BitMap(rowCount);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 8758af3aa00..5035b389279 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -578,7 +578,7 @@ public class StorageEngine implements IService {
}
public void operateFlush(TFlushReq req) {
- if (req.storageGroups == null) {
+ if (req.storageGroups == null || req.storageGroups.isEmpty()) {
StorageEngine.getInstance().syncCloseAllProcessor();
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 951f655c5e4..169428dbeb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -568,15 +568,18 @@ public class TsFileProcessor {
// For sequence tsfile, we update the endTime only when the file is
prepared to be closed.
// For unsequence tsfile, we have to update the endTime for each
insertion.
tsFileResource.updateEndTime(
- deviceEndOffsetPairs.get(0).left, deviceEndOffsetPairs.get(0).right);
+ deviceEndOffsetPairs.get(0).left,
+ insertTabletNode.getTimes()[deviceEndOffsetPairs.get(0).right - 1]);
}
for (int i = 1; i < deviceEndOffsetPairs.size(); i++) {
// the end offset of i - 1 is the start offset of i
tsFileResource.updateStartTime(
- deviceEndOffsetPairs.get(i).left, deviceEndOffsetPairs.get(i -
1).right);
+ deviceEndOffsetPairs.get(i).left,
+ insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i - 1).right]);
if (!sequence) {
tsFileResource.updateEndTime(
- deviceEndOffsetPairs.get(i).left,
deviceEndOffsetPairs.get(i).right);
+ deviceEndOffsetPairs.get(i).left,
+ insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i).right -
1]);
}
}