This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 8c39b0403b7 Add insertRow validation
8c39b0403b7 is described below
commit 8c39b0403b777b83f597bf3965bba942fbadf372
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Jul 5 18:59:21 2024 +0800
Add insertRow validation
---
.../apache/iotdb/session/util/SessionUtils.java | 13 ------
.../planner/plan/node/write/InsertTabletNode.java | 52 +++++++---------------
.../relational/analyzer/StatementAnalyzer.java | 8 ++--
.../relational/sql/ast/WrappedInsertStatement.java | 26 ++++++++---
4 files changed, 40 insertions(+), 59 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 059d7463fac..01177a1ee40 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -261,19 +261,6 @@ public class SessionUtils {
break;
case TEXT:
case STRING:
- if (tablet.getColumnTypes().get(i) == ColumnType.MEASUREMENT) {
- Binary[] binaryValues = (Binary[]) tablet.values[i];
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putInt(binaryValues[index].getLength());
- valueBuffer.put(binaryValues[index].getValues());
- }
- } else {
- String[] stringValues = (String[]) tablet.values[i];
- for (int index = 0; index < tablet.rowSize; index++) {
- ReadWriteIOUtils.write(stringValues[index], valueBuffer);
- }
- }
- break;
case BLOB:
Binary[] binaryValues = (Binary[]) tablet.values[i];
for (int index = 0; index < tablet.rowSize; index++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index ec28d716997..2ef203a456e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -376,10 +377,8 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
switch (dataTypes[i]) {
case TEXT:
case BLOB:
- values[i] = new Binary[rowSize];
- break;
case STRING:
- values[i] = new String[rowSize];
+ values[i] = new Binary[rowSize];
break;
case FLOAT:
values[i] = new float[rowSize];
@@ -630,17 +629,12 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
+ case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
ReadWriteIOUtils.write(binaryValues[j], buffer);
}
break;
- case STRING:
- String[] stringValues = (String[]) column;
- for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(stringValues[j], buffer);
- }
- break;
default:
throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
}
@@ -681,6 +675,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
}
break;
+ case STRING:
case TEXT:
case BLOB:
Binary[] binaryValues = (Binary[]) column;
@@ -688,12 +683,6 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
ReadWriteIOUtils.write(binaryValues[j], stream);
}
break;
- case STRING:
- String[] stringValues = (String[]) column;
- for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(stringValues[j], stream);
- }
- break;
default:
throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
}
@@ -826,17 +815,12 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
+ case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
}
break;
- case STRING:
- String[] stringValues = (String[]) column;
- for (int j = start; j < end; j++) {
- size += ReadWriteIOUtils.sizeToWrite(stringValues[j]);
- }
- break;
}
return size;
}
@@ -949,13 +933,6 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
break;
case STRING:
- String[] stringValues = (String[]) column;
- for (int j = start; j < end; j++) {
- final byte[] bytes =
stringValues[j].getBytes(StandardCharsets.UTF_8);
- buffer.putInt(bytes.length);
- buffer.put(bytes);
- }
- break;
case TEXT:
case BLOB:
Binary[] binaryValues = (Binary[]) column;
@@ -1119,12 +1096,8 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
break;
case TEXT:
case BLOB:
- if (!Arrays.equals((Binary[]) this.columns[i], (Binary[])
columns[i])) {
- return false;
- }
- break;
case STRING:
- if (!Arrays.equals((String[]) this.columns[i], (String[])
columns[i])) {
+ if (!Arrays.equals((Binary[]) this.columns[i], (Binary[])
columns[i])) {
return false;
}
break;
@@ -1195,10 +1168,15 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
break;
case STRING:
- String[] stringValues = (String[]) columns[measurementIndex];
- value =
- new TsPrimitiveType.TsBinary(
- new
Binary(stringValues[lastIdx].getBytes(StandardCharsets.UTF_8)));
+ if (columnCategories[measurementIndex] == TsTableColumnCategory.ID) {
+ String[] stringValues = (String[]) columns[measurementIndex];
+ value =
+ new TsPrimitiveType.TsBinary(
+ new
Binary(stringValues[lastIdx].getBytes(StandardCharsets.UTF_8)));
+ } else {
+ Binary[] binValues = (Binary[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsBinary(binValues[lastIdx]);
+ }
break;
default:
throw new UnSupportedDataTypeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index cf23989fd54..dab50e52423 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -381,18 +381,18 @@ public class StatementAnalyzer {
final Scope ret = Scope.create();
final MPPQueryContext context = insert.getContext();
- InsertBaseStatement insertTabletStatement =
insert.getInnerTreeStatement();
+ InsertBaseStatement innerInsert = insert.getInnerTreeStatement();
- insertTabletStatement =
+ innerInsert =
AnalyzeUtils.analyzeInsert(
context,
- insertTabletStatement,
+ innerInsert,
() -> SchemaValidator.validate(metadata, insert, context),
metadata::getOrCreateDataPartition,
AnalyzeUtils::computeTableDataPartitionParams,
analysis,
false);
- insert.setInnerTreeStatement(insertTabletStatement);
+ insert.setInnerTreeStatement(innerInsert);
analysis.setScope(insert, ret);
return ret;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 5a30fda1f45..32f63c97033 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -77,10 +78,13 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
+ InsertBaseStatement innerTreeStatement = getInnerTreeStatement();
// incoming schema should be consistent with real schema
- for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
+ for (int i = 0, incomingSchemaColumnsSize = incomingSchemaColumns.size();
+ i < incomingSchemaColumnsSize; i++) {
+ ColumnSchema incomingSchemaColumn = incomingSchemaColumns.get(i);
final ColumnSchema realSchemaColumn =
realSchemaMap.get(incomingSchemaColumn.getName());
- validate(incomingSchemaColumn, realSchemaColumn);
+ validate(incomingSchemaColumn, realSchemaColumn, i, innerTreeStatement);
}
// incoming schema should contain all id columns in real schema and have
consistent order
final List<ColumnSchema> realIdColumns = realSchema.getIdColumns();
@@ -105,16 +109,28 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
tableSchema = null;
}
- public static void validate(ColumnSchema incoming, ColumnSchema real) {
+ public static void validate(ColumnSchema incoming, ColumnSchema real, int i,
+ InsertBaseStatement innerTreeStatement) {
if (real == null) {
- throw new SemanticException(
+ SemanticException semanticException = new SemanticException(
"Column " + incoming.getName() + " does not exists or fails to be "
+ "created");
+ if (incoming.getColumnCategory() != TsTableColumnCategory.MEASUREMENT) {
+ throw semanticException;
+ } else {
+ innerTreeStatement.markFailedMeasurement(i, semanticException);
+ return;
+ }
}
if (!incoming.getType().equals(real.getType())) {
- throw new SemanticException(
+ SemanticException semanticException = new SemanticException(
String.format(
"Inconsistent data type of column %s: %s/%s",
incoming.getName(), incoming.getType(), real.getType()));
+ if (incoming.getColumnCategory() != TsTableColumnCategory.MEASUREMENT) {
+ throw semanticException;
+ } else {
+ innerTreeStatement.markFailedMeasurement(i, semanticException);
+ }
}
if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
throw new SemanticException(