This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new e03efd10277 add adjust ID column
e03efd10277 is described below
commit e03efd10277cc33405c243fcbc5cdee14e76cb13
Author: jt2594838 <[email protected]>
AuthorDate: Wed Jul 3 12:04:01 2024 +0800
add adjust ID column
---
.../relational/sql/ast/WrappedInsertStatement.java | 32 +++++++-------
.../plan/statement/crud/InsertBaseStatement.java | 44 +++++++++++++++++++
.../plan/statement/crud/InsertTabletStatement.java | 40 +++++++++++++++++
.../org/apache/iotdb/db/utils/CommonUtils.java | 50 ++++++++++++++++++++++
.../apache/iotdb/commons/schema/table/TsTable.java | 13 ++++++
5 files changed, 164 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 7b5d2306fbe..5a30fda1f45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -84,23 +84,25 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
}
// incoming schema should contain all id columns in real schema and have
consistent order
final List<ColumnSchema> realIdColumns = realSchema.getIdColumns();
- final List<ColumnSchema> incomingIdColumns =
incomingTableSchema.getIdColumns();
- if (realIdColumns.size() > incomingIdColumns.size()) {
- throw new QueryProcessException(
- new SemanticException(
- String.format(
- "The incoming id columns " + "conflicts " + "with existing
ones: %s v.s. %s",
- incomingIdColumns, realIdColumns)));
- }
- for (int i = 0; i < realIdColumns.size(); i++) {
- if (!realIdColumns.get(i).equals(incomingIdColumns.get(i))) {
- throw new QueryProcessException(
- new SemanticException(
- String.format(
- "The incoming id columns " + "conflicts " + "with existing
ones: %s v.s. %s",
- incomingIdColumns, realIdColumns)));
+ adjustIdColumns(realIdColumns);
+ }
+
+ public void adjustIdColumns(List<ColumnSchema> realColumnSchemas) {
+ List<ColumnSchema> incomingColumnSchemas = getTableSchema().getIdColumns();
+ final InsertBaseStatement baseStatement = getInnerTreeStatement();
+ for (int realIdColPos = 0; realIdColPos < realColumnSchemas.size();
realIdColPos++) {
+ ColumnSchema realColumn = realColumnSchemas.get(realIdColPos);
+ int incomingIdColPos = incomingColumnSchemas.indexOf(realColumn);
+ if (incomingIdColPos == -1) {
+ // if the realIdColPos-th id column in the table is missing, insert an
empty column in the
+ // tablet
+ baseStatement.insertColumn(realIdColPos, realColumn);
+ } else {
+ // move the id column in the tablet to the proper position
+ baseStatement.swapColumn(incomingIdColPos, realIdColPos);
}
}
+ tableSchema = null;
}
public static void validate(ColumnSchema incoming, ColumnSchema real) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index b7964aaf333..0df33592d88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -33,7 +33,10 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
@@ -407,5 +410,46 @@ public abstract class InsertBaseStatement extends
Statement {
}
}
}
+
+ public void insertColumn(int pos, ColumnSchema columnSchema) {
+ if (measurementSchemas != null) {
+ MeasurementSchema[] tmp = new
MeasurementSchema[measurementSchemas.length + 1];
+ System.arraycopy(measurementSchemas, 0, tmp, 0, pos);
+ tmp[pos] =
+ new MeasurementSchema(
+ columnSchema.getName(),
InternalTypeManager.getTSDataType(columnSchema.getType()));
+ System.arraycopy(measurementSchemas, pos, tmp, pos + 1,
measurementSchemas.length - pos);
+ measurementSchemas = tmp;
+ }
+
+ String[] tmpMeasurements = new String[measurements.length + 1];
+ System.arraycopy(measurements, 0, tmpMeasurements, 0, pos);
+ tmpMeasurements[pos] = columnSchema.getName();
+ System.arraycopy(measurements, pos, tmpMeasurements, pos + 1,
measurements.length - pos);
+ measurements = tmpMeasurements;
+
+ TSDataType[] tmpTypes = new TSDataType[dataTypes.length + 1];
+ System.arraycopy(dataTypes, 0, tmpTypes, 0, pos);
+ tmpTypes[pos] = InternalTypeManager.getTSDataType(columnSchema.getType());
+ System.arraycopy(dataTypes, pos, tmpTypes, pos + 1, dataTypes.length -
pos);
+ dataTypes = tmpTypes;
+
+ TsTableColumnCategory[] tmpCategories = new
TsTableColumnCategory[columnCategories.length + 1];
+ System.arraycopy(columnCategories, 0, tmpCategories, 0, pos);
+ tmpCategories[pos] = columnSchema.getColumnCategory();
+ System.arraycopy(columnCategories, pos, tmpCategories, pos + 1,
columnCategories.length - pos);
+ columnCategories = tmpCategories;
+ idColumnIndices = null;
+ }
+
+ public void swapColumn(int src, int target) {
+ if (measurementSchemas != null) {
+ CommonUtils.swapArray(measurementSchemas, src, target);
+ }
+ CommonUtils.swapArray(measurements, src, target);
+ CommonUtils.swapArray(dataTypes, src, target);
+ CommonUtils.swapArray(columnCategories, src, target);
+ idColumnIndices = null;
+ }
// endregion
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index ea35b3624aa..c41cd0775bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -31,8 +31,10 @@ import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.utils.CommonUtils;
@@ -468,4 +470,42 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
return deviceIDs[rowIdx];
}
+
+ @Override
+ public void insertColumn(int pos, ColumnSchema columnSchema) {
+ super.insertColumn(pos, columnSchema);
+
+ if (bitMaps != null) {
+ BitMap[] tmpBitmaps = new BitMap[bitMaps.length + 1];
+ System.arraycopy(bitMaps, 0, tmpBitmaps, 0, pos);
+ tmpBitmaps[pos] = new BitMap(rowCount);
+ for (int i = 0; i < rowCount; i++) {
+ tmpBitmaps[pos].mark(i);
+ }
+ System.arraycopy(bitMaps, pos, tmpBitmaps, pos + 1, bitMaps.length -
pos);
+ bitMaps = tmpBitmaps;
+ }
+
+ Object[] tmpColumns = new Object[columns.length + 1];
+ System.arraycopy(columns, 0, tmpColumns, 0, pos);
+ tmpColumns[pos] =
+ CommonUtils.createValueColumnOfDataType(
+ InternalTypeManager.getTSDataType(columnSchema.getType()),
+ columnSchema.getColumnCategory(),
+ rowCount);
+ System.arraycopy(columns, pos, tmpColumns, pos + 1, columns.length - pos);
+ columns = tmpColumns;
+
+ deviceIDs = null;
+ }
+
+ @Override
+ public void swapColumn(int src, int target) {
+ super.swapColumn(src, target);
+ if (bitMaps != null) {
+ CommonUtils.swapArray(bitMaps, src, target);
+ }
+ CommonUtils.swapArray(columns, src, target);
+ deviceIDs = null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index e5c40c1b8c6..cbadd577eea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -46,7 +47,9 @@ import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
@@ -424,4 +427,51 @@ public class CommonUtils {
public static boolean isAlive(long time, long dataTTL) {
return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() -
time) <= dataTTL;
}
+
+ public static Object createValueColumnOfDataType(
+ TSDataType dataType, TsTableColumnCategory columnCategory, int rowNum) {
+ Object valueColumn;
+ switch (dataType) {
+ case INT32:
+ valueColumn = new int[rowNum];
+ break;
+ case INT64:
+ case TIMESTAMP:
+ valueColumn = new long[rowNum];
+ break;
+ case FLOAT:
+ valueColumn = new float[rowNum];
+ break;
+ case DOUBLE:
+ valueColumn = new double[rowNum];
+ break;
+ case BOOLEAN:
+ valueColumn = new boolean[rowNum];
+ break;
+ case TEXT:
+ case STRING:
+ if (columnCategory.equals(TsTableColumnCategory.MEASUREMENT)) {
+ valueColumn = new Binary[rowNum];
+ } else {
+ valueColumn = new String[rowNum];
+ }
+ break;
+ case BLOB:
+ valueColumn = new Binary[rowNum];
+ break;
+ case DATE:
+ valueColumn = new LocalDate[rowNum];
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ return valueColumn;
+ }
+
+ public static void swapArray(Object[] array, int i, int j) {
+ Object tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index 919f96e1708..af4b92b7b25 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -160,4 +160,17 @@ public class TsTable {
public int hashCode() {
return Objects.hash(tableName);
}
+
+ @Override
+ public String toString() {
+ return "TsTable{"
+ + "tableName='"
+ + tableName
+ + '\''
+ + ", columnSchemaMap="
+ + columnSchemaMap
+ + ", props="
+ + props
+ + '}';
+ }
}