This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4646bf1285d968ae42cae18010ba64e6ef32d48b
Author: Minghui Liu <[email protected]>
AuthorDate: Tue Oct 18 10:22:21 2022 +0800

    fix bugs
---
 .../execution/operator/process/IntoOperator.java   | 39 +++++++++++++++++++---
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 14 ++++++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  1 +
 .../planner/plan/node/process/ExchangeNode.java    |  5 +++
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 7dc26ddc28..1491623e77 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -106,12 +106,14 @@ public class IntoOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     TsBlock inputTsBlock = child.next();
-    int lastReadIndex = 0;
-    while (lastReadIndex < inputTsBlock.getPositionCount()) {
-      for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
-        lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+    if (inputTsBlock != null) {
+      int lastReadIndex = 0;
+      while (lastReadIndex < inputTsBlock.getPositionCount()) {
+        for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
+          lastReadIndex = generator.processTsBlock(inputTsBlock, 
lastReadIndex);
+        }
+        insertMultiTabletsInternally(true);
       }
-      insertMultiTabletsInternally(true);
     }
 
     if (child.hasNext()) {
@@ -234,12 +236,39 @@ public class IntoOperator implements ProcessOperator {
       for (InputLocation inputLocation : inputLocations) {
         writtenCounter.put(inputLocation, new AtomicInteger(0));
       }
+      this.reset();
     }
 
     public void reset() {
       this.rowCount = 0;
       this.times = new long[TABLET_ROW_LIMIT];
       this.columns = new Object[this.measurements.length];
+      for (int i = 0; i < this.measurements.length; i++) {
+        switch (dataTypes[i]) {
+          case BOOLEAN:
+            columns[i] = new boolean[TABLET_ROW_LIMIT];
+            break;
+          case INT32:
+            columns[i] = new int[TABLET_ROW_LIMIT];
+            break;
+          case INT64:
+            columns[i] = new long[TABLET_ROW_LIMIT];
+            break;
+          case FLOAT:
+            columns[i] = new float[TABLET_ROW_LIMIT];
+            break;
+          case DOUBLE:
+            columns[i] = new double[TABLET_ROW_LIMIT];
+            break;
+          case TEXT:
+            columns[i] = new Binary[TABLET_ROW_LIMIT];
+            Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", dataTypes[i]));
+        }
+      }
       this.bitMaps = new BitMap[this.measurements.length];
       for (int i = 0; i < this.bitMaps.length; ++i) {
         this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 90f1219767..ed4806c1b4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -151,6 +151,10 @@ public class LogicalPlanBuilder {
     keys.forEach(k -> context.getTypeProvider().setType(k, dataType));
   }
 
+  private void updateTypeProviderWithConstantType(String columnName, 
TSDataType dataType) {
+    context.getTypeProvider().setType(columnName, dataType);
+  }
+
   public LogicalPlanBuilder planRawDataSource(
       Set<Expression> sourceExpressions, Ordering scanOrder, Filter 
timeFilter) {
     List<PlanNode> sourceNodeList = new ArrayList<>();
@@ -854,6 +858,11 @@ public class LogicalPlanBuilder {
       return this;
     }
 
+    ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.forEach(
+        columnHeader -> {
+          updateTypeProviderWithConstantType(
+              columnHeader.getColumnName(), columnHeader.getColumnType());
+        });
     this.root =
         new DeviceViewIntoNode(
             context.getQueryId().genPlanNodeId(), this.getRoot(), 
deviceViewIntoPathDescriptor);
@@ -865,6 +874,11 @@ public class LogicalPlanBuilder {
       return this;
     }
 
+    ColumnHeaderConstant.selectIntoColumnHeaders.forEach(
+        columnHeader -> {
+          updateTypeProviderWithConstantType(
+              columnHeader.getColumnName(), columnHeader.getColumnType());
+        });
     this.root =
         new IntoNode(context.getQueryId().genPlanNodeId(), this.getRoot(), 
intoPathDescriptor);
     return this;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1dcf871373..e91d67b973 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1378,6 +1378,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
     }
 
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new IntoOperator(
         operatorContext,
         child,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 0a4b64f3de..06310be926 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -54,6 +54,11 @@ public class ExchangeNode extends SingleChildProcessNode {
     super(id);
   }
 
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitExchange(this, context);

Reply via email to