This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 191de154738 fix non-measurement type conversion
191de154738 is described below
commit 191de15473865d54b6208bd5eecb44920502e440
Author: jt2594838 <[email protected]>
AuthorDate: Wed Jul 3 10:53:15 2024 +0800
fix non-measurement type conversion
---
.../iotdb/session/it/IoTDBSessionRelationalIT.java | 10 +++-
.../apache/iotdb/session/util/SessionUtils.java | 14 +++++
.../db/exception/query/QueryProcessException.java | 5 ++
.../iotdb/db/exception/sql/SemanticException.java | 9 ++-
.../plan/relational/metadata/TableSchema.java | 7 +++
.../fetcher/TableHeaderSchemaValidator.java | 2 +
.../relational/sql/ast/WrappedInsertStatement.java | 23 +++++++-
.../commons/exception/IoTDBRuntimeException.java | 66 ++++++++++++++++++++++
8 files changed, 131 insertions(+), 5 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index 2e49e93bb00..9fcd9777943 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
@@ -73,13 +75,19 @@ public class IoTDBSessionRelationalIT {
throws IoTDBConnectionException, StatementExecutionException {
try (ISession session =
EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {
session.executeNonQueryStatement("USE db1");
+ session.executeNonQueryStatement(
+ "CREATE TABLE table1 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)");
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
+ final List<ColumnType> columnTypes =
+ Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE,
ColumnType.MEASUREMENT);
- Tablet tablet = new Tablet("table1", schemaList, 10);
+ Tablet tablet = new Tablet("table1", schemaList, columnTypes, 10);
long timestamp = System.currentTimeMillis();
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 81642b3734c..059d7463fac 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -32,6 +32,7 @@ import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.nio.ByteBuffer;
@@ -260,6 +261,19 @@ public class SessionUtils {
break;
case TEXT:
case STRING:
+ if (tablet.getColumnTypes().get(i) == ColumnType.MEASUREMENT) {
+ Binary[] binaryValues = (Binary[]) tablet.values[i];
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putInt(binaryValues[index].getLength());
+ valueBuffer.put(binaryValues[index].getValues());
+ }
+ } else {
+ String[] stringValues = (String[]) tablet.values[i];
+ for (int index = 0; index < tablet.rowSize; index++) {
+ ReadWriteIOUtils.write(stringValues[index], valueBuffer);
+ }
+ }
+ break;
case BLOB:
Binary[] binaryValues = (Binary[]) tablet.values[i];
for (int index = 0; index < tablet.rowSize; index++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
index c7460f381d0..08b6a61d06a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.exception.query;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.rpc.TSStatusCode;
public class QueryProcessException extends IoTDBException {
@@ -37,4 +38,8 @@ public class QueryProcessException extends IoTDBException {
public QueryProcessException(IoTDBException e) {
super(e, e.getErrorCode(), e.isUserException());
}
+
+ public QueryProcessException(IoTDBRuntimeException e) {
+ super(e, e.getErrorCode(), e.isUserException());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
index 2e17d0775d9..66f3af08e91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
@@ -19,13 +19,16 @@
package org.apache.iotdb.db.exception.sql;
-public class SemanticException extends RuntimeException {
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class SemanticException extends IoTDBRuntimeException {
public SemanticException(String message) {
- super(message);
+ super(message, TSStatusCode.SEMANTIC_ERROR.getStatusCode());
}
public SemanticException(Throwable cause) {
- super(cause);
+ super(cause, TSStatusCode.SEMANTIC_ERROR.getStatusCode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
index f60600b976c..e2ff268ef55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
@@ -31,6 +31,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
public class TableSchema {
@@ -99,4 +100,10 @@ public class TableSchema {
public String toString() {
return "TableSchema{" + "tableName='" + tableName + '\'' + ", columns=" +
columns + '}';
}
+
+ public List<ColumnSchema> getIdColumns() {
+ return columns.stream()
+ .filter(c -> c.getColumnCategory() == TsTableColumnCategory.ID)
+ .collect(Collectors.toList());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
index 4a2e93384a6..52b0f17203f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java
@@ -71,6 +71,7 @@ public class TableHeaderSchemaValidator {
String database, TableSchema tableSchema, MPPQueryContext context) {
List<ColumnSchema> inputColumnList = tableSchema.getColumns();
TsTable table = DataNodeTableCache.getInstance().getTable(database,
tableSchema.getTableName());
+ LOGGER.info("Get TsTable from cache: {}", table);
List<ColumnSchema> missingColumnList = new ArrayList<>();
List<ColumnSchema> resultColumnList = new ArrayList<>();
@@ -125,6 +126,7 @@ public class TableHeaderSchemaValidator {
// auto create missing table or columns
if (table == null) {
+ LOGGER.info("Trying to auto-create table: {}", tableSchema);
autoCreateTable(database, tableSchema, context);
table = DataNodeTableCache.getInstance().getTable(database,
tableSchema.getTableName());
} else if (inputColumnList == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 33b6ea24082..7b5d2306fbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -72,14 +72,35 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
}
public void validate(TableSchema realSchema) throws QueryProcessException {
- final List<ColumnSchema> incomingSchemaColumns =
getTableSchema().getColumns();
+ final TableSchema incomingTableSchema = getTableSchema();
+ final List<ColumnSchema> incomingSchemaColumns =
incomingTableSchema.getColumns();
Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
+ // incoming schema should be consistent with real schema
for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
final ColumnSchema realSchemaColumn =
realSchemaMap.get(incomingSchemaColumn.getName());
validate(incomingSchemaColumn, realSchemaColumn);
}
+ // incoming schema should contain all id columns in real schema and have
consistent order
+ final List<ColumnSchema> realIdColumns = realSchema.getIdColumns();
+ final List<ColumnSchema> incomingIdColumns =
incomingTableSchema.getIdColumns();
+ if (realIdColumns.size() > incomingIdColumns.size()) {
+ throw new QueryProcessException(
+ new SemanticException(
+ String.format(
+ "The incoming id columns " + "conflicts " + "with existing
ones: %s v.s. %s",
+ incomingIdColumns, realIdColumns)));
+ }
+ for (int i = 0; i < realIdColumns.size(); i++) {
+ if (!realIdColumns.get(i).equals(incomingIdColumns.get(i))) {
+ throw new QueryProcessException(
+ new SemanticException(
+ String.format(
+ "The incoming id columns " + "conflicts " + "with existing
ones: %s v.s. %s",
+ incomingIdColumns, realIdColumns)));
+ }
+ }
}
public static void validate(ColumnSchema incoming, ColumnSchema real) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBRuntimeException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBRuntimeException.java
new file mode 100644
index 00000000000..3d58f7d606b
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBRuntimeException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.commons.exception;
+
+public class IoTDBRuntimeException extends RuntimeException {
+ protected int errorCode;
+
+ /**
+ * This kind of exception is caused by users' wrong sql, and there is no
need for server to print
+ * the full stack of the exception
+ */
+ protected boolean isUserException = false;
+
+ public IoTDBRuntimeException(String message, int errorCode) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public IoTDBRuntimeException(String message, int errorCode, boolean
isUserException) {
+ super(message);
+ this.errorCode = errorCode;
+ this.isUserException = isUserException;
+ }
+
+ public IoTDBRuntimeException(String message, Throwable cause, int errorCode)
{
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
+
+ public IoTDBRuntimeException(Throwable cause, int errorCode) {
+ super(cause);
+ this.errorCode = errorCode;
+ }
+
+ public IoTDBRuntimeException(Throwable cause, int errorCode, boolean
isUserException) {
+ super(cause);
+ this.errorCode = errorCode;
+ this.isUserException = isUserException;
+ }
+
+ public boolean isUserException() {
+ return isUserException;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+}