This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8618464d861 Fix table model insert without measurement error (#14222)
8618464d861 is described below

commit 8618464d8610333a16226ab0d09e5da5c4269215
Author: Haonan <[email protected]>
AuthorDate: Wed Nov 27 14:37:00 2024 +0800

    Fix table model insert without measurement error (#14222)
---
 .../iotdb/session/it/IoTDBSessionRelationalIT.java | 32 ++++++++++++++++++++++
 .../dataregion/flush/MemTableFlushTask.java        |  2 +-
 .../dataregion/memtable/AbstractMemTable.java      |  7 +++--
 .../memtable/AlignedWritableMemChunk.java          | 18 ++++++++----
 .../memtable/AlignedWritableMemChunkGroup.java     | 10 +++----
 .../dataregion/memtable/IWritableMemChunk.java     |  5 +---
 .../dataregion/memtable/WritableMemChunk.java      |  2 +-
 7 files changed, 56 insertions(+), 20 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index eb28cf01d70..47ba178dd63 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -961,4 +961,36 @@ public class IoTDBSessionRelationalIT {
       }
     }
   }
+
+  @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
+  public void insertWithoutMeasurementTest()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE \"db1\"");
+      session.executeNonQueryStatement("create table tb (a string id, b string 
measurement)");
+      session.executeNonQueryStatement("insert into tb(a) values ('w')");
+      SessionDataSet dataSet = session.executeQueryStatement("select * from 
tb");
+      int cnt = 0;
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals("w", 
rowRecord.getFields().get(1).getBinaryV().toString());
+        assertNull(rowRecord.getFields().get(2).getDataType());
+        cnt++;
+      }
+      assertEquals(1, cnt);
+
+      session.executeNonQueryStatement("flush");
+
+      dataSet = session.executeQueryStatement("select * from tb");
+      cnt = 0;
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals("w", 
rowRecord.getFields().get(1).getBinaryV().toString());
+        assertNull(rowRecord.getFields().get(2).getDataType());
+        cnt++;
+      }
+      assertEquals(1, cnt);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 59633610b71..45a398b96bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -153,7 +153,7 @@ public class MemTableFlushTask {
         /*
          * sort task (first task of flush pipeline)
          */
-        series.sortTvListForFlush(!deviceID.isTableModel());
+        series.sortTvListForFlush();
         long subTaskTime = System.currentTimeMillis() - startTime;
         sortTime += subTaskTime;
         WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, 
subTaskTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 6b0bd5e9f93..b35ab02501f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -190,7 +190,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
               seriesNumber += schemaList.size();
               totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) * 
schemaList.size();
               return new AlignedWritableMemChunkGroup(
-                  
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()));
+                  
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()),
+                  k.isTableModel());
             });
     for (IMeasurementSchema schema : schemaList) {
       if (schema != null && 
!memChunkGroup.contains(schema.getMeasurementId())) {
@@ -943,7 +944,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       boolean isAligned = ReadWriteIOUtils.readBool(stream);
       IWritableMemChunkGroup memChunkGroup;
       if (isAligned) {
-        memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
+        memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream, 
deviceID.isTableModel());
       } else {
         memChunkGroup = WritableMemChunkGroup.deserialize(stream);
       }
@@ -974,7 +975,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       boolean isAligned = ReadWriteIOUtils.readBool(stream);
       IWritableMemChunkGroup memChunkGroup;
       if (isAligned) {
-        memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
+        memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream, 
false);
       } else {
         memChunkGroup = WritableMemChunkGroup.deserialize(stream);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 086e9922c79..8ff67c20096 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -59,7 +59,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
 
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
 
-  public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
+  public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, boolean 
isTableModel) {
     this.measurementIndexMap = new LinkedHashMap<>();
     List<TSDataType> dataTypeList = new ArrayList<>();
     this.schemaList = schemaList;
@@ -68,15 +68,18 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
       dataTypeList.add(schemaList.get(i).getType());
     }
     this.list = AlignedTVList.newAlignedList(dataTypeList);
+    this.ignoreAllNullRows = !isTableModel;
   }
 
-  private AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, 
AlignedTVList list) {
+  private AlignedWritableMemChunk(
+      List<IMeasurementSchema> schemaList, AlignedTVList list, boolean 
isTableModel) {
     this.measurementIndexMap = new LinkedHashMap<>();
     this.schemaList = schemaList;
     for (int i = 0; i < schemaList.size(); i++) {
       measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
     }
     this.list = list;
+    this.ignoreAllNullRows = !isTableModel;
   }
 
   public Set<String> getAllMeasurements() {
@@ -244,6 +247,9 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
 
   @Override
   public long count() {
+    if (!ignoreAllNullRows && measurementIndexMap.isEmpty()) {
+      return list.rowCount();
+    }
     return (long) list.rowCount() * measurementIndexMap.size();
   }
 
@@ -297,8 +303,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public synchronized void sortTvListForFlush(boolean ignoreAllNullRows) {
-    this.ignoreAllNullRows = ignoreAllNullRows;
+  public synchronized void sortTvListForFlush() {
     sortTVList();
   }
 
@@ -533,7 +538,8 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     list.serializeToWAL(buffer);
   }
 
-  public static AlignedWritableMemChunk deserialize(DataInputStream stream) 
throws IOException {
+  public static AlignedWritableMemChunk deserialize(DataInputStream stream, 
boolean isTableModel)
+      throws IOException {
     int schemaListSize = stream.readInt();
     List<IMeasurementSchema> schemaList = new ArrayList<>(schemaListSize);
     for (int i = 0; i < schemaListSize; i++) {
@@ -542,7 +548,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     }
 
     AlignedTVList list = (AlignedTVList) TVList.deserialize(stream);
-    return new AlignedWritableMemChunk(schemaList, list);
+    return new AlignedWritableMemChunk(schemaList, list, isTableModel);
   }
 
   public List<IMeasurementSchema> getSchemaList() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index c5da5624a39..4e079b61787 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -40,8 +40,8 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
 
   private AlignedWritableMemChunk memChunk;
 
-  public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
-    memChunk = new AlignedWritableMemChunk(schemaList);
+  public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList, 
boolean isTableModel) {
+    memChunk = new AlignedWritableMemChunk(schemaList, isTableModel);
   }
 
   private AlignedWritableMemChunkGroup() {
@@ -151,10 +151,10 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
     memChunk.serializeToWAL(buffer);
   }
 
-  public static AlignedWritableMemChunkGroup deserialize(DataInputStream 
stream)
-      throws IOException {
+  public static AlignedWritableMemChunkGroup deserialize(
+      DataInputStream stream, boolean isTableModel) throws IOException {
     AlignedWritableMemChunkGroup memChunkGroup = new 
AlignedWritableMemChunkGroup();
-    memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream);
+    memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream, 
isTableModel);
     return memChunkGroup;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 38ba0b01723..2d631e80209 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -122,11 +122,8 @@ public interface IWritableMemChunk extends WALEntryValue {
    * reference count
    *
    * <p>This interface should be synchronized for concurrent with 
getSortedTvListForQuery
-   *
-   * @param ignoreAllNullRows whether to ignore all null rows, true for tree 
model, false for table
-   *     model
    */
-  void sortTvListForFlush(boolean ignoreAllNullRows);
+  void sortTvListForFlush();
 
   default TVList getTVList() {
     return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 97c95e7f13a..4aad11dee53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -240,7 +240,7 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public synchronized void sortTvListForFlush(boolean ignoreAllNullRows) {
+  public synchronized void sortTvListForFlush() {
     sortTVList();
   }
 

Reply via email to