This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 8c39b0403b7 Add insertRow validation
8c39b0403b7 is described below

commit 8c39b0403b777b83f597bf3965bba942fbadf372
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Jul 5 18:59:21 2024 +0800

    Add insertRow validation
---
 .../apache/iotdb/session/util/SessionUtils.java    | 13 ------
 .../planner/plan/node/write/InsertTabletNode.java  | 52 +++++++---------------
 .../relational/analyzer/StatementAnalyzer.java     |  8 ++--
 .../relational/sql/ast/WrappedInsertStatement.java | 26 ++++++++---
 4 files changed, 40 insertions(+), 59 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 059d7463fac..01177a1ee40 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -261,19 +261,6 @@ public class SessionUtils {
         break;
       case TEXT:
       case STRING:
-        if (tablet.getColumnTypes().get(i) == ColumnType.MEASUREMENT) {
-          Binary[] binaryValues = (Binary[]) tablet.values[i];
-          for (int index = 0; index < tablet.rowSize; index++) {
-            valueBuffer.putInt(binaryValues[index].getLength());
-            valueBuffer.put(binaryValues[index].getValues());
-          }
-        } else {
-          String[] stringValues = (String[]) tablet.values[i];
-          for (int index = 0; index < tablet.rowSize; index++) {
-            ReadWriteIOUtils.write(stringValues[index], valueBuffer);
-          }
-        }
-        break;
       case BLOB:
         Binary[] binaryValues = (Binary[]) tablet.values[i];
         for (int index = 0; index < tablet.rowSize; index++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index ec28d716997..2ef203a456e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -376,10 +377,8 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         switch (dataTypes[i]) {
           case TEXT:
           case BLOB:
-            values[i] = new Binary[rowSize];
-            break;
           case STRING:
-            values[i] = new String[rowSize];
+            values[i] = new Binary[rowSize];
             break;
           case FLOAT:
             values[i] = new float[rowSize];
@@ -630,17 +629,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         break;
       case TEXT:
       case BLOB:
+      case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
           ReadWriteIOUtils.write(binaryValues[j], buffer);
         }
         break;
-      case STRING:
-        String[] stringValues = (String[]) column;
-        for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(stringValues[j], buffer);
-        }
-        break;
       default:
         throw new 
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
     }
@@ -681,6 +675,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
         }
         break;
+      case STRING:
       case TEXT:
       case BLOB:
         Binary[] binaryValues = (Binary[]) column;
@@ -688,12 +683,6 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
           ReadWriteIOUtils.write(binaryValues[j], stream);
         }
         break;
-      case STRING:
-        String[] stringValues = (String[]) column;
-        for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(stringValues[j], stream);
-        }
-        break;
       default:
         throw new 
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
     }
@@ -826,17 +815,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         break;
       case TEXT:
       case BLOB:
+      case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = start; j < end; j++) {
           size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
         }
         break;
-      case STRING:
-        String[] stringValues = (String[]) column;
-        for (int j = start; j < end; j++) {
-          size += ReadWriteIOUtils.sizeToWrite(stringValues[j]);
-        }
-        break;
     }
     return size;
   }
@@ -949,13 +933,6 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         }
         break;
       case STRING:
-        String[] stringValues = (String[]) column;
-        for (int j = start; j < end; j++) {
-          final byte[] bytes = 
stringValues[j].getBytes(StandardCharsets.UTF_8);
-          buffer.putInt(bytes.length);
-          buffer.put(bytes);
-        }
-        break;
       case TEXT:
       case BLOB:
         Binary[] binaryValues = (Binary[]) column;
@@ -1119,12 +1096,8 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
             break;
           case TEXT:
           case BLOB:
-            if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) 
columns[i])) {
-              return false;
-            }
-            break;
           case STRING:
-            if (!Arrays.equals((String[]) this.columns[i], (String[]) 
columns[i])) {
+            if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) 
columns[i])) {
               return false;
             }
             break;
@@ -1195,10 +1168,15 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
         break;
       case STRING:
-        String[] stringValues = (String[]) columns[measurementIndex];
-        value =
-            new TsPrimitiveType.TsBinary(
-                new 
Binary(stringValues[lastIdx].getBytes(StandardCharsets.UTF_8)));
+        if (columnCategories[measurementIndex] == TsTableColumnCategory.ID) {
+          String[] stringValues = (String[]) columns[measurementIndex];
+          value =
+              new TsPrimitiveType.TsBinary(
+                  new 
Binary(stringValues[lastIdx].getBytes(StandardCharsets.UTF_8)));
+        } else {
+          Binary[] binValues = (Binary[]) columns[measurementIndex];
+          value = new TsPrimitiveType.TsBinary(binValues[lastIdx]);
+        }
         break;
       default:
         throw new UnSupportedDataTypeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index cf23989fd54..dab50e52423 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -381,18 +381,18 @@ public class StatementAnalyzer {
       final Scope ret = Scope.create();
 
       final MPPQueryContext context = insert.getContext();
-      InsertBaseStatement insertTabletStatement = 
insert.getInnerTreeStatement();
+      InsertBaseStatement innerInsert = insert.getInnerTreeStatement();
 
-      insertTabletStatement =
+      innerInsert =
           AnalyzeUtils.analyzeInsert(
               context,
-              insertTabletStatement,
+              innerInsert,
               () -> SchemaValidator.validate(metadata, insert, context),
               metadata::getOrCreateDataPartition,
               AnalyzeUtils::computeTableDataPartitionParams,
               analysis,
               false);
-      insert.setInnerTreeStatement(insertTabletStatement);
+      insert.setInnerTreeStatement(innerInsert);
       analysis.setScope(insert, ret);
 
       return ret;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 5a30fda1f45..32f63c97033 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -77,10 +78,13 @@ public abstract class WrappedInsertStatement extends 
WrappedStatement
     Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
     realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
 
+    InsertBaseStatement innerTreeStatement = getInnerTreeStatement();
     // incoming schema should be consistent with real schema
-    for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
+    for (int i = 0, incomingSchemaColumnsSize = incomingSchemaColumns.size();
+        i < incomingSchemaColumnsSize; i++) {
+      ColumnSchema incomingSchemaColumn = incomingSchemaColumns.get(i);
       final ColumnSchema realSchemaColumn = 
realSchemaMap.get(incomingSchemaColumn.getName());
-      validate(incomingSchemaColumn, realSchemaColumn);
+      validate(incomingSchemaColumn, realSchemaColumn, i, innerTreeStatement);
     }
     // incoming schema should contain all id columns in real schema and have 
consistent order
     final List<ColumnSchema> realIdColumns = realSchema.getIdColumns();
@@ -105,16 +109,28 @@ public abstract class WrappedInsertStatement extends 
WrappedStatement
     tableSchema = null;
   }
 
-  public static void validate(ColumnSchema incoming, ColumnSchema real) {
+  public static void validate(ColumnSchema incoming, ColumnSchema real, int i,
+      InsertBaseStatement innerTreeStatement) {
     if (real == null) {
-      throw new SemanticException(
+      SemanticException semanticException = new SemanticException(
           "Column " + incoming.getName() + " does not exists or fails to be " 
+ "created");
+      if (incoming.getColumnCategory() != TsTableColumnCategory.MEASUREMENT) {
+        throw semanticException;
+      } else {
+        innerTreeStatement.markFailedMeasurement(i, semanticException);
+        return;
+      }
     }
     if (!incoming.getType().equals(real.getType())) {
-      throw new SemanticException(
+      SemanticException semanticException = new SemanticException(
           String.format(
               "Inconsistent data type of column %s: %s/%s",
               incoming.getName(), incoming.getType(), real.getType()));
+      if (incoming.getColumnCategory() != TsTableColumnCategory.MEASUREMENT) {
+        throw semanticException;
+      } else {
+        innerTreeStatement.markFailedMeasurement(i, semanticException);
+      }
     }
     if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
       throw new SemanticException(

Reply via email to