This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new e03efd10277 add adjust ID column
e03efd10277 is described below

commit e03efd10277cc33405c243fcbc5cdee14e76cb13
Author: jt2594838 <[email protected]>
AuthorDate: Wed Jul 3 12:04:01 2024 +0800

    add adjust ID column
---
 .../relational/sql/ast/WrappedInsertStatement.java | 32 +++++++-------
 .../plan/statement/crud/InsertBaseStatement.java   | 44 +++++++++++++++++++
 .../plan/statement/crud/InsertTabletStatement.java | 40 +++++++++++++++++
 .../org/apache/iotdb/db/utils/CommonUtils.java     | 50 ++++++++++++++++++++++
 .../apache/iotdb/commons/schema/table/TsTable.java | 13 ++++++
 5 files changed, 164 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 7b5d2306fbe..5a30fda1f45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -84,23 +84,25 @@ public abstract class WrappedInsertStatement extends 
WrappedStatement
     }
     // incoming schema should contain all id columns in real schema and have 
consistent order
     final List<ColumnSchema> realIdColumns = realSchema.getIdColumns();
-    final List<ColumnSchema> incomingIdColumns = 
incomingTableSchema.getIdColumns();
-    if (realIdColumns.size() > incomingIdColumns.size()) {
-      throw new QueryProcessException(
-          new SemanticException(
-              String.format(
-                  "The incoming id columns " + "conflicts " + "with existing 
ones: %s v.s. %s",
-                  incomingIdColumns, realIdColumns)));
-    }
-    for (int i = 0; i < realIdColumns.size(); i++) {
-      if (!realIdColumns.get(i).equals(incomingIdColumns.get(i))) {
-        throw new QueryProcessException(
-            new SemanticException(
-                String.format(
-                    "The incoming id columns " + "conflicts " + "with existing 
ones: %s v.s. %s",
-                    incomingIdColumns, realIdColumns)));
+    adjustIdColumns(realIdColumns);
+  }
+
+  public void adjustIdColumns(List<ColumnSchema> realColumnSchemas) {
+    List<ColumnSchema> incomingColumnSchemas = getTableSchema().getIdColumns();
+    final InsertBaseStatement baseStatement = getInnerTreeStatement();
+    for (int realIdColPos = 0; realIdColPos < realColumnSchemas.size(); 
realIdColPos++) {
+      ColumnSchema realColumn = realColumnSchemas.get(realIdColPos);
+      int incomingIdColPos = incomingColumnSchemas.indexOf(realColumn);
+      if (incomingIdColPos == -1) {
+        // if the realIdColPos-th id column in the table is missing, insert an 
empty column in the
+        // tablet
+        baseStatement.insertColumn(realIdColPos, realColumn);
+      } else {
+        // move the id column in the tablet to the proper position
+        baseStatement.swapColumn(incomingIdColPos, realIdColPos);
       }
     }
+    tableSchema = null;
   }
 
   public static void validate(ColumnSchema incoming, ColumnSchema real) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index b7964aaf333..0df33592d88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -33,7 +33,10 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -407,5 +410,46 @@ public abstract class InsertBaseStatement extends 
Statement {
       }
     }
   }
+
+  public void insertColumn(int pos, ColumnSchema columnSchema) {
+    if (measurementSchemas != null) {
+      MeasurementSchema[] tmp = new 
MeasurementSchema[measurementSchemas.length + 1];
+      System.arraycopy(measurementSchemas, 0, tmp, 0, pos);
+      tmp[pos] =
+          new MeasurementSchema(
+              columnSchema.getName(), 
InternalTypeManager.getTSDataType(columnSchema.getType()));
+      System.arraycopy(measurementSchemas, pos, tmp, pos + 1, 
measurementSchemas.length - pos);
+      measurementSchemas = tmp;
+    }
+
+    String[] tmpMeasurements = new String[measurements.length + 1];
+    System.arraycopy(measurements, 0, tmpMeasurements, 0, pos);
+    tmpMeasurements[pos] = columnSchema.getName();
+    System.arraycopy(measurements, pos, tmpMeasurements, pos + 1, 
measurements.length - pos);
+    measurements = tmpMeasurements;
+
+    TSDataType[] tmpTypes = new TSDataType[dataTypes.length + 1];
+    System.arraycopy(dataTypes, 0, tmpTypes, 0, pos);
+    tmpTypes[pos] = InternalTypeManager.getTSDataType(columnSchema.getType());
+    System.arraycopy(dataTypes, pos, tmpTypes, pos + 1, dataTypes.length - 
pos);
+    dataTypes = tmpTypes;
+
+    TsTableColumnCategory[] tmpCategories = new 
TsTableColumnCategory[columnCategories.length + 1];
+    System.arraycopy(columnCategories, 0, tmpCategories, 0, pos);
+    tmpCategories[pos] = columnSchema.getColumnCategory();
+    System.arraycopy(columnCategories, pos, tmpCategories, pos + 1, 
columnCategories.length - pos);
+    columnCategories = tmpCategories;
+    idColumnIndices = null;
+  }
+
+  public void swapColumn(int src, int target) {
+    if (measurementSchemas != null) {
+      CommonUtils.swapArray(measurementSchemas, src, target);
+    }
+    CommonUtils.swapArray(measurements, src, target);
+    CommonUtils.swapArray(dataTypes, src, target);
+    CommonUtils.swapArray(columnCategories, src, target);
+    idColumnIndices = null;
+  }
   // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index ea35b3624aa..c41cd0775bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -31,8 +31,10 @@ import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.utils.CommonUtils;
@@ -468,4 +470,42 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
     return deviceIDs[rowIdx];
   }
+
+  @Override
+  public void insertColumn(int pos, ColumnSchema columnSchema) {
+    super.insertColumn(pos, columnSchema);
+
+    if (bitMaps != null) {
+      BitMap[] tmpBitmaps = new BitMap[bitMaps.length + 1];
+      System.arraycopy(bitMaps, 0, tmpBitmaps, 0, pos);
+      tmpBitmaps[pos] = new BitMap(rowCount);
+      for (int i = 0; i < rowCount; i++) {
+        tmpBitmaps[pos].mark(i);
+      }
+      System.arraycopy(bitMaps, pos, tmpBitmaps, pos + 1, bitMaps.length - 
pos);
+      bitMaps = tmpBitmaps;
+    }
+
+    Object[] tmpColumns = new Object[columns.length + 1];
+    System.arraycopy(columns, 0, tmpColumns, 0, pos);
+    tmpColumns[pos] =
+        CommonUtils.createValueColumnOfDataType(
+            InternalTypeManager.getTSDataType(columnSchema.getType()),
+            columnSchema.getColumnCategory(),
+            rowCount);
+    System.arraycopy(columns, pos, tmpColumns, pos + 1, columns.length - pos);
+    columns = tmpColumns;
+
+    deviceIDs = null;
+  }
+
+  @Override
+  public void swapColumn(int src, int target) {
+    super.swapColumn(src, target);
+    if (bitMaps != null) {
+      CommonUtils.swapArray(bitMaps, src, target);
+    }
+    CommonUtils.swapArray(columns, src, target);
+    deviceIDs = null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index e5c40c1b8c6..cbadd577eea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -46,7 +47,9 @@ import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
 
+import java.time.LocalDate;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
@@ -424,4 +427,51 @@ public class CommonUtils {
   public static boolean isAlive(long time, long dataTTL) {
     return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - 
time) <= dataTTL;
   }
+
+  public static Object createValueColumnOfDataType(
+      TSDataType dataType, TsTableColumnCategory columnCategory, int rowNum) {
+    Object valueColumn;
+    switch (dataType) {
+      case INT32:
+        valueColumn = new int[rowNum];
+        break;
+      case INT64:
+      case TIMESTAMP:
+        valueColumn = new long[rowNum];
+        break;
+      case FLOAT:
+        valueColumn = new float[rowNum];
+        break;
+      case DOUBLE:
+        valueColumn = new double[rowNum];
+        break;
+      case BOOLEAN:
+        valueColumn = new boolean[rowNum];
+        break;
+      case TEXT:
+      case STRING:
+        if (columnCategory.equals(TsTableColumnCategory.MEASUREMENT)) {
+          valueColumn = new Binary[rowNum];
+        } else {
+          valueColumn = new String[rowNum];
+        }
+        break;
+      case BLOB:
+        valueColumn = new Binary[rowNum];
+        break;
+      case DATE:
+        valueColumn = new LocalDate[rowNum];
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", dataType));
+    }
+    return valueColumn;
+  }
+
+  public static void swapArray(Object[] array, int i, int j) {
+    Object tmp = array[i];
+    array[i] = array[j];
+    array[j] = tmp;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index 919f96e1708..af4b92b7b25 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -160,4 +160,17 @@ public class TsTable {
   public int hashCode() {
     return Objects.hash(tableName);
   }
+
+  @Override
+  public String toString() {
+    return "TsTable{"
+        + "tableName='"
+        + tableName
+        + '\''
+        + ", columnSchemaMap="
+        + columnSchemaMap
+        + ", props="
+        + props
+        + '}';
+  }
 }

Reply via email to