This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 7dd08274122 add data region test
7dd08274122 is described below
commit 7dd0827412274fa5cc6d376ed49382183b68299f
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jul 2 17:18:30 2024 +0800
add data region test
---
.../planner/plan/node/write/InsertTabletNode.java | 44 +++++++--
.../node/write/RelationalInsertTabletNode.java | 3 +-
.../plan/relational/metadata/TableSchema.java | 4 +
.../optimizations/PushPredicateIntoTableScan.java | 4 +-
.../dataregion/memtable/TsFileProcessor.java | 21 +++-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 7 +-
.../db/utils/datastructure/AlignedTVList.java | 7 +-
.../plan/statement/StatementTestUtils.java | 107 ++++++++++++++++++---
.../storageengine/dataregion/DataRegionTest.java | 49 ++++++++++
9 files changed, 215 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d73c07e9dcd..daee42d16f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+import java.nio.charset.StandardCharsets;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -35,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -629,12 +631,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
- case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
ReadWriteIOUtils.write(binaryValues[j], buffer);
}
break;
+ case STRING:
+ String[] stringValues = (String[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ ReadWriteIOUtils.write(stringValues[j], buffer);
+ }
+ break;
default:
throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
}
@@ -677,12 +684,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
- case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
ReadWriteIOUtils.write(binaryValues[j], stream);
}
break;
+ case STRING:
+ String[] stringValues = (String[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ ReadWriteIOUtils.write(stringValues[j], stream);
+ }
+ break;
default:
throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
}
@@ -815,12 +827,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
- case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
}
break;
+ case STRING:
+ String[] stringValues = (String[]) column;
+ for (int j = start; j < end; j++) {
+ size += ReadWriteIOUtils.sizeToWrite(stringValues[j]);
+ }
+ break;
}
return size;
}
@@ -932,9 +949,16 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
buffer.put(BytesUtils.boolToByte(boolValues[j]));
}
break;
+ case STRING:
+ String[] stringValues = (String[]) column;
+ for (int j = start; j < end; j++) {
+ final byte[] bytes =
stringValues[j].getBytes(StandardCharsets.UTF_8);
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ }
+ break;
case TEXT:
case BLOB:
- case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
buffer.putInt(binaryValues[j].getLength());
@@ -1096,11 +1120,15 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
- case STRING:
if (!Arrays.equals((Binary[]) this.columns[i], (Binary[])
columns[i])) {
return false;
}
break;
+ case STRING:
+ if (!Arrays.equals((String[]) this.columns[i], (String[])
columns[i])) {
+ return false;
+ }
+ break;
default:
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
@@ -1164,10 +1192,14 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
- case STRING:
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
break;
+ case STRING:
+ String[] stringValues = (String[]) columns[measurementIndex];
+ value =
+ new TsPrimitiveType.TsBinary(new
Binary(stringValues[lastIdx].getBytes(StandardCharsets.UTF_8)));
+ break;
default:
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 31e000934c8..4fabc0a369c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -169,7 +170,7 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
prevDeviceId = getDeviceID(i);
}
}
- result.add(new Pair<>(prevDeviceId, start));
+ result.add(new Pair<>(prevDeviceId, end));
return result;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
index 2abed5a8a18..d8e21024b4d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.metadata;
import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
@@ -65,6 +66,9 @@ public class TableSchema {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
List<ColumnType> columnTypes = new ArrayList<>();
for (ColumnSchema column : columns) {
+ if (column.getColumnCategory() == TsTableColumnCategory.TIME) {
+ continue;
+ }
measurementSchemas.add(
new MeasurementSchema(
column.getName(),
InternalTypeManager.getTSDataType(column.getType())));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index d3300cc359e..4baa449773f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -254,8 +254,8 @@ public class PushPredicateIntoTableScan implements
RelationalPlanOptimizer {
}
@Override
- public PlanNode visitRelationalInsertTablet(RelationalInsertTabletNode
node,
- RewriterContext context) {
+ public PlanNode visitRelationalInsertTablet(
+ RelationalInsertTabletNode node, RewriterContext context) {
return node;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 9fb4d679893..8875647bcfc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -554,13 +554,24 @@ public class TsFileProcessor {
results[i] = RpcUtils.SUCCESS_STATUS;
}
+ final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
+ insertTabletNode.splitByDevice(start, end);
tsFileResource.updateStartTime(
- insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[start]);
- // For sequence tsfile, we update the endTime only when the file is
prepared to be closed.
- // For unsequence tsfile, we have to update the endTime for each insertion.
+ deviceEndOffsetPairs.get(0).left, insertTabletNode.getTimes()[start]);
if (!sequence) {
+ // For sequence tsfile, we update the endTime only when the file is
prepared to be closed.
+ // For unsequence tsfile, we have to update the endTime for each
insertion.
tsFileResource.updateEndTime(
- insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[end -
1]);
+ deviceEndOffsetPairs.get(0).left,
deviceEndOffsetPairs.get(0).right);
+ }
+ for (int i = 1; i < deviceEndOffsetPairs.size(); i++) {
+ // the end offset of i - 1 is the start offset of i
+ tsFileResource.updateStartTime(
+ deviceEndOffsetPairs.get(i).left, deviceEndOffsetPairs.get(i -
1).right);
+ if (!sequence) {
+ tsFileResource.updateEndTime(
+ deviceEndOffsetPairs.get(i).left,
deviceEndOffsetPairs.get(i).right);
+ }
}
tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
@@ -914,7 +925,7 @@ public class TsFileProcessor {
} else {
incomingPointNum = end - start;
for (TSStatus result : results) {
- if (result != null) {
+ if (result != null && result.code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
incomingPointNum--;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 60f00cd946b..3e21535aba5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
@@ -100,7 +101,7 @@ public class MemUtils {
long memSize = 0;
memSize += (long) (end - start) *
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
for (int i = start; i < end; i++) {
- if (results == null || results[i] == null) {
+ if (results == null || results[i] == null || results[i].code ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
memSize += RamUsageEstimator.sizeOf(column[i].getValues());
}
}
@@ -141,7 +142,9 @@ public class MemUtils {
memSize += (long) (end - start) *
insertTabletNode.getDataTypes()[i].getDataTypeSize();
} else {
for (int j = start; j < end; j++) {
- memSize += insertTabletNode.getDataTypes()[i].getDataTypeSize();
+ if (results[j] == null || results[j].code ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ memSize += insertTabletNode.getDataTypes()[i].getDataTypeSize();
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index aa23d8fce00..3a08d0d05a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -745,7 +746,8 @@ public abstract class AlignedTVList extends TVList {
for (int j = 0; j < values.size(); j++) {
if (value[j] == null
|| bitMaps != null && bitMaps[j] != null &&
bitMaps[j].isMarked(idx + i)
- || results != null && results[idx + i] != null) {
+ || results != null && results[idx + i] != null && results[idx
+ + i].code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
markNullValue(j, arrayIdx, elementIdx + i);
}
}
@@ -762,7 +764,8 @@ public abstract class AlignedTVList extends TVList {
for (int j = 0; j < values.size(); j++) {
if (value[j] == null
|| bitMaps != null && bitMaps[j] != null &&
bitMaps[j].isMarked(idx + i)
- || results != null && results[idx + i] != null) {
+ || results != null && results[idx + i] != null && results[idx
+ + i].code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
markNullValue(j, arrayIdx, elementIdx + i);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index 3cdbf8657db..2f7fb06d1ba 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -20,16 +20,25 @@
package org.apache.iotdb.db.queryengine.plan.statement;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.AttributeColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.IdColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.type.TypeFactory;
import java.util.ArrayList;
import java.util.List;
+import org.apache.tsfile.write.schema.MeasurementSchema;
public class StatementTestUtils {
@@ -42,16 +51,24 @@ public class StatementTestUtils {
}
public static String[] genColumnNames() {
- return new String[] {"id1", "attr1", "m1"};
+ return new String[]{"id1", "attr1", "m1"};
}
public static TSDataType[] genDataTypes() {
- return new TSDataType[] {TSDataType.STRING, TSDataType.STRING,
TSDataType.DOUBLE};
+ return new TSDataType[]{TSDataType.STRING, TSDataType.STRING,
TSDataType.DOUBLE};
+ }
+
+ public static MeasurementSchema[] genMeasurementSchemas() {
+ return new MeasurementSchema[]{
+ new MeasurementSchema("id1", TSDataType.STRING),
+ new MeasurementSchema("attr1", TSDataType.STRING),
+ new MeasurementSchema("m1", TSDataType.DOUBLE)
+ };
}
public static TsTableColumnCategory[] genColumnCategories() {
- return new TsTableColumnCategory[] {
- TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE,
TsTableColumnCategory.MEASUREMENT
+ return new TsTableColumnCategory[]{
+ TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE,
TsTableColumnCategory.MEASUREMENT
};
}
@@ -74,27 +91,47 @@ public class StatementTestUtils {
}
public static Object[] genColumns() {
- return new Object[] {
- new String[] {"a", "b", "c"},
- new String[] {"x", "y", "z"},
- new double[] {1.0, 2.0, 3.0}
+ return genColumns(3, 0);
+ }
+
+ public static Object[] genColumns(int cnt, int offset) {
+ final String[] ids = new String[cnt];
+ final String[] attrs = new String[cnt];
+ final double[] values = new double[cnt];
+ for (int i = 0; i < cnt; i++) {
+ ids[i] = "id:" + (i + offset);
+ attrs[i] = "attr:" + (i + offset);
+ values[i] = (i + offset) * 1.0;
+ }
+
+ return new Object[]{
+ ids, attrs, values
};
}
public static long[] genTimestamps() {
- return new long[] {1L, 2L, 3L};
+ return genTimestamps(3, 0);
}
- public static InsertTabletStatement genInsertTabletStatement(boolean
writeToTable) {
+ public static long[] genTimestamps(int cnt, int offset) {
+ final long[] timestamps = new long[cnt];
+ for (int i = 0; i < cnt; i++) {
+ timestamps[i] = i + offset;
+ }
+ return timestamps;
+ }
+
+ public static InsertTabletStatement genInsertTabletStatement(boolean
writeToTable, int rowCnt,
+ int offset) {
String[] measurements = genColumnNames();
TSDataType[] dataTypes = genDataTypes();
TsTableColumnCategory[] columnCategories = genColumnCategories();
- Object[] columns = genColumns();
- long[] timestamps = genTimestamps();
+ Object[] columns = genColumns(rowCnt, offset);
+ long[] timestamps = genTimestamps(rowCnt, offset);
InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
- insertTabletStatement.setDevicePath(new PartialPath(new String[]
{tableName()}));
+ insertTabletStatement.setDevicePath(new PartialPath(new
String[]{tableName()}));
insertTabletStatement.setMeasurements(measurements);
insertTabletStatement.setDataTypes(dataTypes);
insertTabletStatement.setColumnCategories(columnCategories);
@@ -105,4 +142,48 @@ public class StatementTestUtils {
return insertTabletStatement;
}
+
+ public static RelationalInsertTabletNode genInsertTabletNode(int rowCnt,
+ int offset) {
+ String[] measurements = genColumnNames();
+ TSDataType[] dataTypes = genDataTypes();
+ TsTableColumnCategory[] columnCategories = genColumnCategories();
+ final MeasurementSchema[] measurementSchemas = genMeasurementSchemas();
+
+ Object[] columns = genColumns(rowCnt, offset);
+ long[] timestamps = genTimestamps(rowCnt, offset);
+
+ return
+ new RelationalInsertTabletNode(new PlanNodeId(offset + "-" + rowCnt),
+ new PartialPath(new String[]{tableName()}),
+ true,
+ measurements, dataTypes, measurementSchemas, timestamps, null,
columns, rowCnt, columnCategories);
+ }
+
+ public static InsertTabletStatement genInsertTabletStatement(boolean
writeToTable) {
+ return genInsertTabletStatement(writeToTable, 3, 0);
+ }
+
+ public static TsTable genTsTable() {
+ final TsTable tsTable = new TsTable(tableName());
+ String[] measurements = genColumnNames();
+ TSDataType[] dataTypes = genDataTypes();
+ TsTableColumnCategory[] columnCategories = genColumnCategories();
+ for (int i = 0; i < columnCategories.length; i++) {
+ switch (columnCategories[i]) {
+ case ID:
+ tsTable.addColumnSchema(new IdColumnSchema(measurements[i],
dataTypes[i]));
+ break;
+ case ATTRIBUTE:
+ tsTable.addColumnSchema(new AttributeColumnSchema(measurements[i],
dataTypes[i]));
+ break;
+ case MEASUREMENT:
+ default:
+ tsTable.addColumnSchema(new MeasurementColumnSchema(measurements[i],
dataTypes[i],
+ TSEncoding.PLAIN, CompressionType.UNCOMPRESSED));
+ break;
+ }
+ }
+ return tsTable;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 80b45ebf46c..79e77c6b0d6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -19,18 +19,23 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+
+import java.util.Arrays;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -42,6 +47,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -113,6 +121,10 @@ public class DataRegionTest {
dataRegion = new DummyDataRegion(systemDir, storageGroup);
StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion);
CompactionTaskManager.getInstance().start();
+
DataNodeTableCache.getInstance().preCreateTable(dataRegion.getDatabaseName(),
+ StatementTestUtils.genTsTable());
+
DataNodeTableCache.getInstance().commitCreateTable(dataRegion.getDatabaseName(),
+ StatementTestUtils.tableName());
}
@After
@@ -249,6 +261,43 @@ public class DataRegionTest {
}
}
+ @Test
+ public void testRelationalTabletWriteAndSyncClose()
+ throws QueryProcessException, WriteProcessException {
+ RelationalInsertTabletNode insertTabletNode1 = genInsertTabletNode(10, 0);
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ RelationalInsertTabletNode insertTabletNode2 = genInsertTabletNode(10, 10);
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ String measurementName = "m1";
+ MeasurementSchema measurementSchema = new
MeasurementSchema(measurementName, TSDataType.DOUBLE);
+ final IDeviceID deviceID1 = insertTabletNode1.getDeviceID(0);
+ final IDeviceID deviceID2 = insertTabletNode2.getDeviceID(0);
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new AlignedFullPath(deviceID1,
+ Collections.singletonList(measurementName),
Collections.singletonList(measurementSchema))),
+ deviceID1, context, null, null);
+ Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+
+ queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new AlignedFullPath(deviceID2,
+ Collections.singletonList(measurementName),
Collections.singletonList(measurementSchema))),
+ deviceID2, context, null, null);
+ Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
@Test
public void testIoTDBTabletWriteAndSyncClose()
throws QueryProcessException, IllegalPathException,
WriteProcessException {