This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e4b77248388 Fix insertRelationalTablet may cause sequence data
overlapped (#13778)
e4b77248388 is described below
commit e4b772483885fdba67f767ed1428bfa514d469d8
Author: Haonan <[email protected]>
AuthorDate: Wed Oct 16 10:13:59 2024 +0800
Fix insertRelationalTablet may cause sequence data overlapped (#13778)
---
.../relational/it/db/it/IoTDBInsertTableIT.java | 83 ++++++++++++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 14 +++-
2 files changed, 94 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
index 6264fc26651..58883ecbdb8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
@@ -958,6 +958,89 @@ public class IoTDBInsertTableIT {
}
}
+ @Test
+ public void testInsertUnsequenceData()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ISession session =
EnvFactory.getEnv().getSessionConnection(BaseEnv.TABLE_SQL_DIALECT)) {
+ session.executeNonQueryStatement("USE \"test\"");
+ // the table is missing column "m2"
+ session.executeNonQueryStatement(
+ "CREATE TABLE table4 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)");
+
+ // the insertion contains "m2"
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
+ schemaList.add(new MeasurementSchema("m2", TSDataType.DOUBLE));
+ final List<Tablet.ColumnType> columnTypes =
+ Arrays.asList(
+ Tablet.ColumnType.ID,
+ Tablet.ColumnType.ATTRIBUTE,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT);
+
+ long timestamp = 0;
+ Tablet tablet = new Tablet("table4", schemaList, columnTypes, 15);
+
+ for (long row = 0; row < 15; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + row);
+ tablet.addValue("id1", rowIndex, "id:" + row);
+ tablet.addValue("attr1", rowIndex, "attr:" + row);
+ tablet.addValue("m1", rowIndex, row * 1.0);
+ tablet.addValue("m2", rowIndex, row * 1.0);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet, true);
+ } catch (StatementExecutionException e) {
+ // a partial insertion should be reported
+ if (!e.getMessage()
+ .equals(
+ "507: Fail to insert measurements [m2] caused by [Column
m2 does not exists or fails to be created]")) {
+ throw e;
+ }
+ }
+ tablet.reset();
+ }
+ }
+
+ session.executeNonQueryStatement("FLush");
+
+ for (long row = 0; row < 15; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, 14 - row);
+ tablet.addValue("id1", rowIndex, "id:" + row);
+ tablet.addValue("attr1", rowIndex, "attr:" + row);
+ tablet.addValue("m1", rowIndex, row * 1.0);
+ tablet.addValue("m2", rowIndex, row * 1.0);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet, true);
+ } catch (StatementExecutionException e) {
+ if (!e.getMessage()
+ .equals(
+ "507: Fail to insert measurements [m2] caused by [Column
m2 does not exists or fails to be created]")) {
+ throw e;
+ }
+ }
+ tablet.reset();
+ }
+ }
+ session.executeNonQueryStatement("FLush");
+
+ int cnt = 0;
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
table4");
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ cnt++;
+ }
+ assertEquals(29, cnt);
+ }
+ }
+
private List<Integer> checkHeader(
ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[]
expectedTypes)
throws SQLException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 68516ff4b12..adab85da6c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1070,7 +1070,8 @@ public class DataRegion implements IDataRegionForQuery {
: Long.MAX_VALUE;
}
- private boolean splitAndInsert(InsertTabletNode insertTabletNode, int loc,
TSStatus[] results) {
+ private boolean splitAndInsert(
+ InsertTabletNode insertTabletNode, int loc, int endOffset, TSStatus[]
results) {
boolean noFailure = true;
// before is first start point
@@ -1084,7 +1085,7 @@ public class DataRegion implements IDataRegionForQuery {
int insertCnt = 0;
// if is sequence
boolean isSequence = false;
- while (loc < insertTabletNode.getRowCount()) {
+ while (loc < endOffset) {
long time = insertTabletNode.getTimes()[loc];
final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
@@ -1195,7 +1196,14 @@ public class DataRegion implements IDataRegionForQuery {
boolean noFailure;
int loc = insertTabletNode.checkTTL(results, i ->
getTTL(insertTabletNode));
noFailure = loc == 0;
- noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
+ List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
+ insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
+ int start = loc;
+ for (Pair<IDeviceID, Integer> deviceEndOffsetPair :
deviceEndOffsetPairs) {
+ int end = deviceEndOffsetPair.getRight();
+ noFailure = noFailure && splitAndInsert(insertTabletNode, start, end,
results);
+ start = end;
+ }
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
&& !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {