This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 7dd08274122 add data region test
7dd08274122 is described below

commit 7dd0827412274fa5cc6d376ed49382183b68299f
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jul 2 17:18:30 2024 +0800

    add data region test
---
 .../planner/plan/node/write/InsertTabletNode.java  |  44 +++++++--
 .../node/write/RelationalInsertTabletNode.java     |   3 +-
 .../plan/relational/metadata/TableSchema.java      |   4 +
 .../optimizations/PushPredicateIntoTableScan.java  |   4 +-
 .../dataregion/memtable/TsFileProcessor.java       |  21 +++-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |   7 +-
 .../db/utils/datastructure/AlignedTVList.java      |   7 +-
 .../plan/statement/StatementTestUtils.java         | 107 ++++++++++++++++++---
 .../storageengine/dataregion/DataRegionTest.java   |  49 ++++++++++
 9 files changed, 215 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d73c07e9dcd..daee42d16f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
+import java.nio.charset.StandardCharsets;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -35,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -629,12 +631,17 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         break;
       case TEXT:
       case BLOB:
-      case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
           ReadWriteIOUtils.write(binaryValues[j], buffer);
         }
         break;
+      case STRING:
+        String[] stringValues = (String[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          ReadWriteIOUtils.write(stringValues[j], buffer);
+        }
+        break;
       default:
         throw new 
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
     }
@@ -677,12 +684,17 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         break;
       case TEXT:
       case BLOB:
-      case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
           ReadWriteIOUtils.write(binaryValues[j], stream);
         }
         break;
+      case STRING:
+        String[] stringValues = (String[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          ReadWriteIOUtils.write(stringValues[j], stream);
+        }
+        break;
       default:
         throw new 
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
     }
@@ -815,12 +827,17 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         break;
       case TEXT:
       case BLOB:
-      case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = start; j < end; j++) {
           size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
         }
         break;
+      case STRING:
+        String[] stringValues = (String[]) column;
+        for (int j = start; j < end; j++) {
+          size += ReadWriteIOUtils.sizeToWrite(stringValues[j]);
+        }
+        break;
     }
     return size;
   }
@@ -932,9 +949,16 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
           buffer.put(BytesUtils.boolToByte(boolValues[j]));
         }
         break;
+      case STRING:
+        String[] stringValues = (String[]) column;
+        for (int j = start; j < end; j++) {
+          final byte[] bytes = 
stringValues[j].getBytes(StandardCharsets.UTF_8);
+          buffer.putInt(bytes.length);
+          buffer.put(bytes);
+        }
+        break;
       case TEXT:
       case BLOB:
-      case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = start; j < end; j++) {
           buffer.putInt(binaryValues[j].getLength());
@@ -1096,11 +1120,15 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
             break;
           case TEXT:
           case BLOB:
-          case STRING:
             if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) 
columns[i])) {
               return false;
             }
             break;
+          case STRING:
+            if (!Arrays.equals((String[]) this.columns[i], (String[]) 
columns[i])) {
+              return false;
+            }
+            break;
           default:
             throw new UnSupportedDataTypeException(
                 String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
@@ -1164,10 +1192,14 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         break;
       case TEXT:
       case BLOB:
-      case STRING:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
         value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
         break;
+      case STRING:
+        String[] stringValues = (String[]) columns[measurementIndex];
+        value =
+            new TsPrimitiveType.TsBinary(new 
Binary(stringValues[lastIdx].getBytes(StandardCharsets.UTF_8)));
+        break;
       default:
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 31e000934c8..4fabc0a369c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -169,7 +170,7 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
         prevDeviceId = getDeviceID(i);
       }
     }
-    result.add(new Pair<>(prevDeviceId, start));
+    result.add(new Pair<>(prevDeviceId, end));
 
     return result;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
index 2abed5a8a18..d8e21024b4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
 import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
 import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
 
@@ -65,6 +66,9 @@ public class TableSchema {
     List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
     List<ColumnType> columnTypes = new ArrayList<>();
     for (ColumnSchema column : columns) {
+      if (column.getColumnCategory() == TsTableColumnCategory.TIME) {
+        continue;
+      }
       measurementSchemas.add(
           new MeasurementSchema(
               column.getName(), 
InternalTypeManager.getTSDataType(column.getType())));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index d3300cc359e..4baa449773f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -254,8 +254,8 @@ public class PushPredicateIntoTableScan implements 
RelationalPlanOptimizer {
     }
 
     @Override
-    public PlanNode visitRelationalInsertTablet(RelationalInsertTabletNode 
node,
-        RewriterContext context) {
+    public PlanNode visitRelationalInsertTablet(
+        RelationalInsertTabletNode node, RewriterContext context) {
       return node;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 9fb4d679893..8875647bcfc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -554,13 +554,24 @@ public class TsFileProcessor {
       results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
+    final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
+        insertTabletNode.splitByDevice(start, end);
     tsFileResource.updateStartTime(
-        insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[start]);
-    // For sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
-    // For unsequence tsfile, we have to update the endTime for each insertion.
+        deviceEndOffsetPairs.get(0).left, insertTabletNode.getTimes()[start]);
     if (!sequence) {
+      // For sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
+      // For unsequence tsfile, we have to update the endTime for each 
insertion.
       tsFileResource.updateEndTime(
-          insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[end - 
1]);
+          deviceEndOffsetPairs.get(0).left,  
deviceEndOffsetPairs.get(0).right);
+    }
+    for (int i = 1; i < deviceEndOffsetPairs.size(); i++) {
+      // the end offset of i - 1 is the start offset of i
+      tsFileResource.updateStartTime(
+          deviceEndOffsetPairs.get(i).left, deviceEndOffsetPairs.get(i - 
1).right);
+      if (!sequence) {
+        tsFileResource.updateEndTime(
+            deviceEndOffsetPairs.get(i).left,  
deviceEndOffsetPairs.get(i).right);
+      }
     }
 
     tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
@@ -914,7 +925,7 @@ public class TsFileProcessor {
     } else {
       incomingPointNum = end - start;
       for (TSStatus result : results) {
-        if (result != null) {
+        if (result != null && result.code != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           incomingPointNum--;
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 60f00cd946b..3e21535aba5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
@@ -100,7 +101,7 @@ public class MemUtils {
     long memSize = 0;
     memSize += (long) (end - start) * 
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
     for (int i = start; i < end; i++) {
-      if (results == null || results[i] == null) {
+      if (results == null || results[i] == null || results[i].code == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         memSize += RamUsageEstimator.sizeOf(column[i].getValues());
       }
     }
@@ -141,7 +142,9 @@ public class MemUtils {
         memSize += (long) (end - start) * 
insertTabletNode.getDataTypes()[i].getDataTypeSize();
       } else {
         for (int j = start; j < end; j++) {
-          memSize += insertTabletNode.getDataTypes()[i].getDataTypeSize();
+          if (results[j] == null || results[j].code == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            memSize += insertTabletNode.getDataTypes()[i].getDataTypeSize();
+          }
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index aa23d8fce00..3a08d0d05a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.MathUtils;
 
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -745,7 +746,8 @@ public abstract class AlignedTVList extends TVList {
           for (int j = 0; j < values.size(); j++) {
             if (value[j] == null
                 || bitMaps != null && bitMaps[j] != null && 
bitMaps[j].isMarked(idx + i)
-                || results != null && results[idx + i] != null) {
+                || results != null && results[idx + i] != null && results[idx
+                 + i].code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
               markNullValue(j, arrayIdx, elementIdx + i);
             }
           }
@@ -762,7 +764,8 @@ public abstract class AlignedTVList extends TVList {
           for (int j = 0; j < values.size(); j++) {
             if (value[j] == null
                 || bitMaps != null && bitMaps[j] != null && 
bitMaps[j].isMarked(idx + i)
-                || results != null && results[idx + i] != null) {
+                || results != null && results[idx + i] != null && results[idx
+                + i].code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
               markNullValue(j, arrayIdx, elementIdx + i);
             }
           }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index 3cdbf8657db..2f7fb06d1ba 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -20,16 +20,25 @@
 package org.apache.iotdb.db.queryengine.plan.statement;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.AttributeColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.IdColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.type.TypeFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 public class StatementTestUtils {
 
@@ -42,16 +51,24 @@ public class StatementTestUtils {
   }
 
   public static String[] genColumnNames() {
-    return new String[] {"id1", "attr1", "m1"};
+    return new String[]{"id1", "attr1", "m1"};
   }
 
   public static TSDataType[] genDataTypes() {
-    return new TSDataType[] {TSDataType.STRING, TSDataType.STRING, 
TSDataType.DOUBLE};
+    return new TSDataType[]{TSDataType.STRING, TSDataType.STRING, 
TSDataType.DOUBLE};
+  }
+
+  public static MeasurementSchema[] genMeasurementSchemas() {
+    return new MeasurementSchema[]{
+        new MeasurementSchema("id1", TSDataType.STRING),
+        new MeasurementSchema("attr1", TSDataType.STRING),
+        new MeasurementSchema("m1", TSDataType.DOUBLE)
+    };
   }
 
   public static TsTableColumnCategory[] genColumnCategories() {
-    return new TsTableColumnCategory[] {
-      TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE, 
TsTableColumnCategory.MEASUREMENT
+    return new TsTableColumnCategory[]{
+        TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE, 
TsTableColumnCategory.MEASUREMENT
     };
   }
 
@@ -74,27 +91,47 @@ public class StatementTestUtils {
   }
 
   public static Object[] genColumns() {
-    return new Object[] {
-      new String[] {"a", "b", "c"},
-      new String[] {"x", "y", "z"},
-      new double[] {1.0, 2.0, 3.0}
+    return genColumns(3, 0);
+  }
+
+  public static Object[] genColumns(int cnt, int offset) {
+    final String[] ids = new String[cnt];
+    final String[] attrs = new String[cnt];
+    final double[] values = new double[cnt];
+    for (int i = 0; i < cnt; i++) {
+      ids[i] = "id:" + (i + offset);
+      attrs[i] = "attr:" + (i + offset);
+      values[i] = (i + offset) * 1.0;
+    }
+
+    return new Object[]{
+        ids, attrs, values
     };
   }
 
   public static long[] genTimestamps() {
-    return new long[] {1L, 2L, 3L};
+    return genTimestamps(3, 0);
   }
 
-  public static InsertTabletStatement genInsertTabletStatement(boolean 
writeToTable) {
+  public static long[] genTimestamps(int cnt, int offset) {
+    final long[] timestamps = new long[cnt];
+    for (int i = 0; i < cnt; i++) {
+      timestamps[i] = i + offset;
+    }
+    return timestamps;
+  }
+
+  public static InsertTabletStatement genInsertTabletStatement(boolean 
writeToTable, int rowCnt,
+      int offset) {
     String[] measurements = genColumnNames();
     TSDataType[] dataTypes = genDataTypes();
     TsTableColumnCategory[] columnCategories = genColumnCategories();
 
-    Object[] columns = genColumns();
-    long[] timestamps = genTimestamps();
+    Object[] columns = genColumns(rowCnt, offset);
+    long[] timestamps = genTimestamps(rowCnt, offset);
 
     InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
-    insertTabletStatement.setDevicePath(new PartialPath(new String[] 
{tableName()}));
+    insertTabletStatement.setDevicePath(new PartialPath(new 
String[]{tableName()}));
     insertTabletStatement.setMeasurements(measurements);
     insertTabletStatement.setDataTypes(dataTypes);
     insertTabletStatement.setColumnCategories(columnCategories);
@@ -105,4 +142,48 @@ public class StatementTestUtils {
 
     return insertTabletStatement;
   }
+
+  public static RelationalInsertTabletNode genInsertTabletNode(int rowCnt,
+      int offset) {
+    String[] measurements = genColumnNames();
+    TSDataType[] dataTypes = genDataTypes();
+    TsTableColumnCategory[] columnCategories = genColumnCategories();
+    final MeasurementSchema[] measurementSchemas = genMeasurementSchemas();
+
+    Object[] columns = genColumns(rowCnt, offset);
+    long[] timestamps = genTimestamps(rowCnt, offset);
+
+    return
+        new RelationalInsertTabletNode(new PlanNodeId(offset + "-" + rowCnt),
+            new PartialPath(new String[]{tableName()}),
+            true,
+            measurements, dataTypes, measurementSchemas, timestamps, null, 
columns, rowCnt, columnCategories);
+  }
+
+  public static InsertTabletStatement genInsertTabletStatement(boolean 
writeToTable) {
+    return genInsertTabletStatement(writeToTable, 3, 0);
+  }
+
+  public static TsTable genTsTable() {
+    final TsTable tsTable = new TsTable(tableName());
+    String[] measurements = genColumnNames();
+    TSDataType[] dataTypes = genDataTypes();
+    TsTableColumnCategory[] columnCategories = genColumnCategories();
+    for (int i = 0; i < columnCategories.length; i++) {
+      switch (columnCategories[i]) {
+        case ID:
+          tsTable.addColumnSchema(new IdColumnSchema(measurements[i], 
dataTypes[i]));
+          break;
+        case ATTRIBUTE:
+          tsTable.addColumnSchema(new AttributeColumnSchema(measurements[i], 
dataTypes[i]));
+          break;
+        case MEASUREMENT:
+        default:
+          tsTable.addColumnSchema(new MeasurementColumnSchema(measurements[i], 
dataTypes[i],
+              TSEncoding.PLAIN, CompressionType.UNCOMPRESSED));
+          break;
+      }
+    }
+    return tsTable;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 80b45ebf46c..79e77c6b0d6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -19,18 +19,23 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+
+import java.util.Arrays;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -42,6 +47,9 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -113,6 +121,10 @@ public class DataRegionTest {
     dataRegion = new DummyDataRegion(systemDir, storageGroup);
     StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion);
     CompactionTaskManager.getInstance().start();
+    
DataNodeTableCache.getInstance().preCreateTable(dataRegion.getDatabaseName(),
+        StatementTestUtils.genTsTable());
+    
DataNodeTableCache.getInstance().commitCreateTable(dataRegion.getDatabaseName(),
+        StatementTestUtils.tableName());
   }
 
   @After
@@ -249,6 +261,43 @@ public class DataRegionTest {
     }
   }
 
+  @Test
+  public void testRelationalTabletWriteAndSyncClose()
+      throws QueryProcessException, WriteProcessException {
+    RelationalInsertTabletNode insertTabletNode1 = genInsertTabletNode(10, 0);
+    dataRegion.insertTablet(insertTabletNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    RelationalInsertTabletNode insertTabletNode2 = genInsertTabletNode(10, 10);
+    dataRegion.insertTablet(insertTabletNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    String measurementName = "m1";
+    MeasurementSchema measurementSchema = new 
MeasurementSchema(measurementName, TSDataType.DOUBLE);
+    final IDeviceID deviceID1 = insertTabletNode1.getDeviceID(0);
+    final IDeviceID deviceID2 = insertTabletNode2.getDeviceID(0);
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new AlignedFullPath(deviceID1,
+                Collections.singletonList(measurementName), 
Collections.singletonList(measurementSchema))),
+            deviceID1, context, null, null);
+    Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+
+    queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new AlignedFullPath(deviceID2,
+                Collections.singletonList(measurementName), 
Collections.singletonList(measurementSchema))),
+            deviceID2, context, null, null);
+    Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
   @Test
   public void testIoTDBTabletWriteAndSyncClose()
       throws QueryProcessException, IllegalPathException, 
WriteProcessException {

Reply via email to