This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8618464d861 Fix table model insert without measurement error (#14222)
8618464d861 is described below
commit 8618464d8610333a16226ab0d09e5da5c4269215
Author: Haonan <[email protected]>
AuthorDate: Wed Nov 27 14:37:00 2024 +0800
Fix table model insert without measurement error (#14222)
---
.../iotdb/session/it/IoTDBSessionRelationalIT.java | 32 ++++++++++++++++++++++
.../dataregion/flush/MemTableFlushTask.java | 2 +-
.../dataregion/memtable/AbstractMemTable.java | 7 +++--
.../memtable/AlignedWritableMemChunk.java | 18 ++++++++----
.../memtable/AlignedWritableMemChunkGroup.java | 10 +++----
.../dataregion/memtable/IWritableMemChunk.java | 5 +---
.../dataregion/memtable/WritableMemChunk.java | 2 +-
7 files changed, 56 insertions(+), 20 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index eb28cf01d70..47ba178dd63 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -961,4 +961,36 @@ public class IoTDBSessionRelationalIT {
}
}
}
+
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertWithoutMeasurementTest()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ session.executeNonQueryStatement("create table tb (a string id, b string
measurement)");
+ session.executeNonQueryStatement("insert into tb(a) values ('w')");
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
tb");
+ int cnt = 0;
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals("w",
rowRecord.getFields().get(1).getBinaryV().toString());
+ assertNull(rowRecord.getFields().get(2).getDataType());
+ cnt++;
+ }
+ assertEquals(1, cnt);
+
+ session.executeNonQueryStatement("flush");
+
+ dataSet = session.executeQueryStatement("select * from tb");
+ cnt = 0;
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals("w",
rowRecord.getFields().get(1).getBinaryV().toString());
+ assertNull(rowRecord.getFields().get(2).getDataType());
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 59633610b71..45a398b96bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -153,7 +153,7 @@ public class MemTableFlushTask {
/*
* sort task (first task of flush pipeline)
*/
- series.sortTvListForFlush(!deviceID.isTableModel());
+ series.sortTvListForFlush();
long subTaskTime = System.currentTimeMillis() - startTime;
sortTime += subTaskTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK,
subTaskTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 6b0bd5e9f93..b35ab02501f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -190,7 +190,8 @@ public abstract class AbstractMemTable implements IMemTable
{
seriesNumber += schemaList.size();
totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) *
schemaList.size();
return new AlignedWritableMemChunkGroup(
-
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()));
+
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()),
+ k.isTableModel());
});
for (IMeasurementSchema schema : schemaList) {
if (schema != null &&
!memChunkGroup.contains(schema.getMeasurementId())) {
@@ -943,7 +944,7 @@ public abstract class AbstractMemTable implements IMemTable
{
boolean isAligned = ReadWriteIOUtils.readBool(stream);
IWritableMemChunkGroup memChunkGroup;
if (isAligned) {
- memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
+ memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream,
deviceID.isTableModel());
} else {
memChunkGroup = WritableMemChunkGroup.deserialize(stream);
}
@@ -974,7 +975,7 @@ public abstract class AbstractMemTable implements IMemTable
{
boolean isAligned = ReadWriteIOUtils.readBool(stream);
IWritableMemChunkGroup memChunkGroup;
if (isAligned) {
- memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
+ memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream,
false);
} else {
memChunkGroup = WritableMemChunkGroup.deserialize(stream);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 086e9922c79..8ff67c20096 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -59,7 +59,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
- public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
+ public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, boolean
isTableModel) {
this.measurementIndexMap = new LinkedHashMap<>();
List<TSDataType> dataTypeList = new ArrayList<>();
this.schemaList = schemaList;
@@ -68,15 +68,18 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
dataTypeList.add(schemaList.get(i).getType());
}
this.list = AlignedTVList.newAlignedList(dataTypeList);
+ this.ignoreAllNullRows = !isTableModel;
}
- private AlignedWritableMemChunk(List<IMeasurementSchema> schemaList,
AlignedTVList list) {
+ private AlignedWritableMemChunk(
+ List<IMeasurementSchema> schemaList, AlignedTVList list, boolean
isTableModel) {
this.measurementIndexMap = new LinkedHashMap<>();
this.schemaList = schemaList;
for (int i = 0; i < schemaList.size(); i++) {
measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
}
this.list = list;
+ this.ignoreAllNullRows = !isTableModel;
}
public Set<String> getAllMeasurements() {
@@ -244,6 +247,9 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
@Override
public long count() {
+ if (!ignoreAllNullRows && measurementIndexMap.isEmpty()) {
+ return list.rowCount();
+ }
return (long) list.rowCount() * measurementIndexMap.size();
}
@@ -297,8 +303,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
@Override
- public synchronized void sortTvListForFlush(boolean ignoreAllNullRows) {
- this.ignoreAllNullRows = ignoreAllNullRows;
+ public synchronized void sortTvListForFlush() {
sortTVList();
}
@@ -533,7 +538,8 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
list.serializeToWAL(buffer);
}
- public static AlignedWritableMemChunk deserialize(DataInputStream stream)
throws IOException {
+ public static AlignedWritableMemChunk deserialize(DataInputStream stream,
boolean isTableModel)
+ throws IOException {
int schemaListSize = stream.readInt();
List<IMeasurementSchema> schemaList = new ArrayList<>(schemaListSize);
for (int i = 0; i < schemaListSize; i++) {
@@ -542,7 +548,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
AlignedTVList list = (AlignedTVList) TVList.deserialize(stream);
- return new AlignedWritableMemChunk(schemaList, list);
+ return new AlignedWritableMemChunk(schemaList, list, isTableModel);
}
public List<IMeasurementSchema> getSchemaList() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index c5da5624a39..4e079b61787 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -40,8 +40,8 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
private AlignedWritableMemChunk memChunk;
- public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
- memChunk = new AlignedWritableMemChunk(schemaList);
+ public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList,
boolean isTableModel) {
+ memChunk = new AlignedWritableMemChunk(schemaList, isTableModel);
}
private AlignedWritableMemChunkGroup() {
@@ -151,10 +151,10 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
memChunk.serializeToWAL(buffer);
}
- public static AlignedWritableMemChunkGroup deserialize(DataInputStream
stream)
- throws IOException {
+ public static AlignedWritableMemChunkGroup deserialize(
+ DataInputStream stream, boolean isTableModel) throws IOException {
AlignedWritableMemChunkGroup memChunkGroup = new
AlignedWritableMemChunkGroup();
- memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream);
+ memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream,
isTableModel);
return memChunkGroup;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 38ba0b01723..2d631e80209 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -122,11 +122,8 @@ public interface IWritableMemChunk extends WALEntryValue {
* reference count
*
* <p>This interface should be synchronized for concurrent with
getSortedTvListForQuery
- *
- * @param ignoreAllNullRows whether to ignore all null rows, true for tree
model, false for table
- * model
*/
- void sortTvListForFlush(boolean ignoreAllNullRows);
+ void sortTvListForFlush();
default TVList getTVList() {
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 97c95e7f13a..4aad11dee53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -240,7 +240,7 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public synchronized void sortTvListForFlush(boolean ignoreAllNullRows) {
+ public synchronized void sortTvListForFlush() {
sortTVList();
}