This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0f328ce5eb9 Fix empty tsfile genarated when all datatypes in one
insert mismatching (#11456)
0f328ce5eb9 is described below
commit 0f328ce5eb907e9580f262e60c57c6b076f3e264
Author: Haonan <[email protected]>
AuthorDate: Tue Nov 7 18:07:53 2023 +0800
Fix empty tsfile genarated when all datatypes in one insert mismatching
(#11456)
---
.../plan/planner/plan/node/write/InsertNode.java | 7 ++
.../db/storageengine/dataregion/DataRegion.java | 7 +-
.../storageengine/dataregion/DataRegionTest.java | 120 +++++++++++++++++++++
3 files changed, 132 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 69beae45ddd..8d29348c702 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -280,6 +280,13 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
public int getFailedMeasurementNumber() {
return failedMeasurementNumber;
}
+
+ public boolean allMeasurementFailed() {
+ if (measurements != null) {
+ return failedMeasurementNumber >= measurements.length;
+ }
+ return true;
+ }
// endregion
// region progress index
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 024f5dfd2ad..9ae93b8894a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1038,8 +1038,8 @@ public class DataRegion implements IDataRegionForQuery {
boolean sequence,
TSStatus[] results,
long timePartitionId) {
- // return when start >= end
- if (start >= end) {
+ // return when start >= end or all measurement failed
+ if (start >= end || insertTabletNode.allMeasurementFailed()) {
return true;
}
@@ -1105,6 +1105,9 @@ public class DataRegion implements IDataRegionForQuery {
private void insertToTsFileProcessor(
InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
throws WriteProcessException {
+ if (insertRowNode.allMeasurementFailed()) {
+ return;
+ }
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
if (tsFileProcessor == null) {
return;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 4adb806b74b..1479ba63f05 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -384,6 +384,86 @@ public class DataRegionTest {
}
}
+ @Test
+ public void testAllMeasurementsFailedTabletWriteAndSyncClose()
+ throws QueryProcessException, IllegalPathException,
WriteProcessException {
+ String[] measurements = new String[2];
+ measurements[0] = "s0";
+ measurements[1] = "s1";
+ TSDataType[] dataTypes = new TSDataType[2];
+ dataTypes[0] = TSDataType.INT32;
+ dataTypes[1] = TSDataType.INT64;
+
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+ measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32,
TSEncoding.PLAIN);
+ measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.PLAIN);
+
+ long[] times = new long[100];
+ Object[] columns = new Object[2];
+ columns[0] = new int[100];
+ columns[1] = new long[100];
+
+ for (int r = 0; r < 100; r++) {
+ times[r] = r;
+ ((int[]) columns[0])[r] = 1;
+ ((long[]) columns[1])[r] = 1;
+ }
+
+ InsertTabletNode insertTabletNode1 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ null,
+ columns,
+ times.length);
+ insertTabletNode1.setFailedMeasurementNumber(2);
+
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ for (int r = 50; r < 149; r++) {
+ times[r - 50] = r;
+ ((int[]) columns[0])[r - 50] = 1;
+ ((long[]) columns[1])[r - 50] = 1;
+ }
+
+ InsertTabletNode insertTabletNode2 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ null,
+ columns,
+ times.length);
+ insertTabletNode2.setFailedMeasurementNumber(2);
+
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId,
measurementId)),
+ deviceId,
+ context,
+ null);
+
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
@Test
public void testSeqAndUnSeqSyncClose()
throws WriteProcessException, QueryProcessException,
IllegalPathException {
@@ -420,6 +500,46 @@ public class DataRegionTest {
}
}
+ @Test
+ public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose()
+ throws WriteProcessException, QueryProcessException,
IllegalPathException {
+ for (int j = 21; j <= 30; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record);
+ rowNode.setFailedMeasurementNumber(1);
+ dataRegion.insert(rowNode);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 10; j >= 1; j--) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record);
+ rowNode.setFailedMeasurementNumber(1);
+ dataRegion.insert(rowNode);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId,
measurementId)),
+ deviceId,
+ context,
+ null);
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
@Test
public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
throws WriteProcessException, QueryProcessException,
IllegalPathException, IOException {