This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new cb0914d6644 finish analyze insertRow
cb0914d6644 is described below
commit cb0914d664482e5316906d46c2d09448d7c2146d
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 8 10:00:52 2024 +0800
finish analyze insertRow
---
.../db/queryengine/plan/analyze/AnalyzeUtils.java | 30 ++++++++++++++++------
.../relational/sql/ast/WrappedInsertStatement.java | 20 +++++++++++++--
.../plan/statement/crud/InsertBaseStatement.java | 17 ++++++------
3 files changed, 49 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
index 2634a21951d..13c6ce9bd42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -109,6 +109,11 @@ public class AnalyzeUtils {
}
return computeDataPartitionParams(
timePartitionSlotMap,
context.getSession().getDatabaseName().orElse(null));
+ } else if (statement instanceof InsertRowStatement) {
+ InsertRowStatement insertRowStatement = (InsertRowStatement) statement;
+ return computeDataPartitionParams(
+ Collections.singletonMap(insertRowStatement.getTableDeviceID(),
Collections.singleton(insertRowStatement.getTimePartitionSlot())),
+ context.getSession().getDatabaseName().orElse(null));
}
throw new UnsupportedOperationException("computeDataPartitionParams for "
+ statement);
}
@@ -116,13 +121,8 @@ public class AnalyzeUtils {
public static List<DataPartitionQueryParam> computeTreeDataPartitionParams(
InsertBaseStatement statement, MPPQueryContext context) {
if (statement instanceof InsertTabletStatement) {
- InsertTabletStatement insertTabletStatement = (InsertTabletStatement)
statement;
- DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
- dataPartitionQueryParam.setDeviceID(
- insertTabletStatement.getDevicePath().getIDeviceIDAsFullDevice());
- dataPartitionQueryParam.setTimePartitionSlotList(
- insertTabletStatement.getTimePartitionSlots());
-
dataPartitionQueryParam.setDatabaseName(context.getSession().getDatabaseName().orElse(null));
+ DataPartitionQueryParam dataPartitionQueryParam =
getTreeDataPartitionQueryParam(
+ (InsertTabletStatement) statement, context);
return Collections.singletonList(dataPartitionQueryParam);
} else if (statement instanceof InsertMultiTabletsStatement) {
InsertMultiTabletsStatement insertMultiTabletsStatement =
@@ -155,6 +155,17 @@ public class AnalyzeUtils {
throw new UnsupportedOperationException("computeDataPartitionParams for "
+ statement);
}
+ private static DataPartitionQueryParam
getTreeDataPartitionQueryParam(InsertTabletStatement statement,
+ MPPQueryContext context) {
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+ dataPartitionQueryParam.setDeviceID(
+ statement.getDevicePath().getIDeviceIDAsFullDevice());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ statement.getTimePartitionSlots());
+
dataPartitionQueryParam.setDatabaseName(context.getSession().getDatabaseName().orElse(null));
+ return dataPartitionQueryParam;
+ }
+
public static List<DataPartitionQueryParam> computeDataPartitionParams(
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap,
String databaseName) {
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
@@ -216,7 +227,9 @@ public class AnalyzeUtils {
}
}
- /** get analysis according to statement and params */
+ /**
+ * get analysis according to statement and params
+ */
public static void analyzeDataPartition(
IAnalysis analysis,
List<DataPartitionQueryParam> dataPartitionQueryParams,
@@ -243,6 +256,7 @@ public class AnalyzeUtils {
}
public interface DataPartitionQueryParamComputation {
+
List<DataPartitionQueryParam> compute(InsertBaseStatement statement,
MPPQueryContext context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 32f63c97033..f7e7b1e0c6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.tsfile.read.common.type.TypeFactory;
@@ -91,6 +92,10 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
adjustIdColumns(realIdColumns);
}
+ /**
+ * Adjust the order of ID columns in this insertion to be consistent with
that from the schema region.
+ * @param realColumnSchemas column order from the schema region
+ */
public void adjustIdColumns(List<ColumnSchema> realColumnSchemas) {
List<ColumnSchema> incomingColumnSchemas = getTableSchema().getIdColumns();
final InsertBaseStatement baseStatement = getInnerTreeStatement();
@@ -112,27 +117,38 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
public static void validate(ColumnSchema incoming, ColumnSchema real, int i,
InsertBaseStatement innerTreeStatement) {
if (real == null) {
+ // the column does not exist and auto-creation is disabled
SemanticException semanticException = new SemanticException(
"Column " + incoming.getName() + " does not exists or fails to be "
+ "created");
if (incoming.getColumnCategory() != TsTableColumnCategory.MEASUREMENT) {
+ // non-measurement columns cannot be partially inserted
throw semanticException;
} else {
+ // partial insertion
innerTreeStatement.markFailedMeasurement(i, semanticException);
return;
}
}
- if (!incoming.getType().equals(real.getType())) {
+ if (incoming.getType() == null) {
+ // sql insertion does not provide type
+
innerTreeStatement.setDataType(InternalTypeManager.getTSDataType(real.getType()),
i);
+ } else if (!incoming.getType().equals(real.getType())) {
SemanticException semanticException = new SemanticException(
String.format(
"Inconsistent data type of column %s: %s/%s",
incoming.getName(), incoming.getType(), real.getType()));
if (incoming.getColumnCategory() != TsTableColumnCategory.MEASUREMENT) {
+ // non-measurement columns cannot be partially inserted
throw semanticException;
} else {
+ // partial insertion
innerTreeStatement.markFailedMeasurement(i, semanticException);
}
}
- if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
+ if (incoming.getColumnCategory() == null) {
+ // sql insertion does not provide category
+ innerTreeStatement.setColumnCategory(real.getColumnCategory(), i);
+ } else if (!incoming.getColumnCategory().equals(real.getColumnCategory()))
{
throw new SemanticException(
String.format(
"Inconsistent column category of column %s: %s/%s",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index a7dc15dab53..fcccc7df97b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -137,6 +137,10 @@ public abstract class InsertBaseStatement extends
Statement {
this.dataTypes = dataTypes;
}
+ public void setDataType(TSDataType dataType, int i) {
+ this.dataTypes[i] = dataType;
+ }
+
/** Returns true when this statement is empty and no need to write into the
server */
public abstract boolean isEmpty();
@@ -252,14 +256,11 @@ public abstract class InsertBaseStatement extends
Statement {
public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
this.columnCategories = columnCategories;
- if (columnCategories != null) {
- idColumnIndices = new ArrayList<>();
- for (int i = 0; i < columnCategories.length; i++) {
- if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
- idColumnIndices.add(i);
- }
- }
- }
+ }
+
+ public void setColumnCategory(TsTableColumnCategory columnCategory, int i) {
+ this.columnCategories[i] = columnCategory;
+ this.idColumnIndices = null;
}
public List<Integer> getIdColumnIndices() {