This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e4b77248388 Fix insertRelationalTablet may cause sequence data 
overlapped (#13778)
e4b77248388 is described below

commit e4b772483885fdba67f767ed1428bfa514d469d8
Author: Haonan <[email protected]>
AuthorDate: Wed Oct 16 10:13:59 2024 +0800

    Fix insertRelationalTablet may cause sequence data overlapped (#13778)
---
 .../relational/it/db/it/IoTDBInsertTableIT.java    | 83 ++++++++++++++++++++++
 .../db/storageengine/dataregion/DataRegion.java    | 14 +++-
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
index 6264fc26651..58883ecbdb8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
@@ -958,6 +958,89 @@ public class IoTDBInsertTableIT {
     }
   }
 
+  @Test
+  public void testInsertUnsequenceData()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ISession session = 
EnvFactory.getEnv().getSessionConnection(BaseEnv.TABLE_SQL_DIALECT)) {
+      session.executeNonQueryStatement("USE \"test\"");
+      // the table is missing column "m2"
+      session.executeNonQueryStatement(
+          "CREATE TABLE table4 (id1 string id, attr1 string attribute, "
+              + "m1 double "
+              + "measurement)");
+
+      // the insertion contains "m2"
+      List<IMeasurementSchema> schemaList = new ArrayList<>();
+      schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+      schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
+      schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
+      schemaList.add(new MeasurementSchema("m2", TSDataType.DOUBLE));
+      final List<Tablet.ColumnType> columnTypes =
+          Arrays.asList(
+              Tablet.ColumnType.ID,
+              Tablet.ColumnType.ATTRIBUTE,
+              Tablet.ColumnType.MEASUREMENT,
+              Tablet.ColumnType.MEASUREMENT);
+
+      long timestamp = 0;
+      Tablet tablet = new Tablet("table4", schemaList, columnTypes, 15);
+
+      for (long row = 0; row < 15; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp + row);
+        tablet.addValue("id1", rowIndex, "id:" + row);
+        tablet.addValue("attr1", rowIndex, "attr:" + row);
+        tablet.addValue("m1", rowIndex, row * 1.0);
+        tablet.addValue("m2", rowIndex, row * 1.0);
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          try {
+            session.insertRelationalTablet(tablet, true);
+          } catch (StatementExecutionException e) {
+            // a partial insertion should be reported
+            if (!e.getMessage()
+                .equals(
+                    "507: Fail to insert measurements [m2] caused by [Column 
m2 does not exists or fails to be created]")) {
+              throw e;
+            }
+          }
+          tablet.reset();
+        }
+      }
+
+      session.executeNonQueryStatement("FLush");
+
+      for (long row = 0; row < 15; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, 14 - row);
+        tablet.addValue("id1", rowIndex, "id:" + row);
+        tablet.addValue("attr1", rowIndex, "attr:" + row);
+        tablet.addValue("m1", rowIndex, row * 1.0);
+        tablet.addValue("m2", rowIndex, row * 1.0);
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          try {
+            session.insertRelationalTablet(tablet, true);
+          } catch (StatementExecutionException e) {
+            if (!e.getMessage()
+                .equals(
+                    "507: Fail to insert measurements [m2] caused by [Column 
m2 does not exists or fails to be created]")) {
+              throw e;
+            }
+          }
+          tablet.reset();
+        }
+      }
+      session.executeNonQueryStatement("FLush");
+
+      int cnt = 0;
+      SessionDataSet dataSet = session.executeQueryStatement("select * from 
table4");
+      while (dataSet.hasNext()) {
+        dataSet.next();
+        cnt++;
+      }
+      assertEquals(29, cnt);
+    }
+  }
+
   private List<Integer> checkHeader(
       ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[] 
expectedTypes)
       throws SQLException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 68516ff4b12..adab85da6c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1070,7 +1070,8 @@ public class DataRegion implements IDataRegionForQuery {
         : Long.MAX_VALUE;
   }
 
-  private boolean splitAndInsert(InsertTabletNode insertTabletNode, int loc, 
TSStatus[] results) {
+  private boolean splitAndInsert(
+      InsertTabletNode insertTabletNode, int loc, int endOffset, TSStatus[] 
results) {
     boolean noFailure = true;
 
     // before is first start point
@@ -1084,7 +1085,7 @@ public class DataRegion implements IDataRegionForQuery {
     int insertCnt = 0;
     // if is sequence
     boolean isSequence = false;
-    while (loc < insertTabletNode.getRowCount()) {
+    while (loc < endOffset) {
       long time = insertTabletNode.getTimes()[loc];
       final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
 
@@ -1195,7 +1196,14 @@ public class DataRegion implements IDataRegionForQuery {
       boolean noFailure;
       int loc = insertTabletNode.checkTTL(results, i -> 
getTTL(insertTabletNode));
       noFailure = loc == 0;
-      noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
+      List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
+          insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
+      int start = loc;
+      for (Pair<IDeviceID, Integer> deviceEndOffsetPair : 
deviceEndOffsetPairs) {
+        int end = deviceEndOffsetPair.getRight();
+        noFailure = noFailure && splitAndInsert(insertTabletNode, start, end, 
results);
+        start = end;
+      }
 
       if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
           && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {

Reply via email to