This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4646bf1285d968ae42cae18010ba64e6ef32d48b Author: Minghui Liu <[email protected]> AuthorDate: Tue Oct 18 10:22:21 2022 +0800 fix bugs --- .../execution/operator/process/IntoOperator.java | 39 +++++++++++++++++++--- .../db/mpp/plan/planner/LogicalPlanBuilder.java | 14 ++++++++ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 1 + .../planner/plan/node/process/ExchangeNode.java | 5 +++ 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 7dc26ddc28..1491623e77 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -106,12 +106,14 @@ public class IntoOperator implements ProcessOperator { @Override public TsBlock next() { TsBlock inputTsBlock = child.next(); - int lastReadIndex = 0; - while (lastReadIndex < inputTsBlock.getPositionCount()) { - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex); + if (inputTsBlock != null) { + int lastReadIndex = 0; + while (lastReadIndex < inputTsBlock.getPositionCount()) { + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex); + } + insertMultiTabletsInternally(true); } - insertMultiTabletsInternally(true); } if (child.hasNext()) { @@ -234,12 +236,39 @@ public class IntoOperator implements ProcessOperator { for (InputLocation inputLocation : inputLocations) { writtenCounter.put(inputLocation, new AtomicInteger(0)); } + this.reset(); } public void reset() { this.rowCount = 0; this.times = new long[TABLET_ROW_LIMIT]; this.columns = new Object[this.measurements.length]; + for (int i = 0; i < this.measurements.length; i++) { + switch (dataTypes[i]) { + case BOOLEAN: + columns[i] = new boolean[TABLET_ROW_LIMIT]; + break; + case INT32: + columns[i] = new int[TABLET_ROW_LIMIT]; + break; + case INT64: + columns[i] = new long[TABLET_ROW_LIMIT]; + break; + case FLOAT: + columns[i] = new float[TABLET_ROW_LIMIT]; + break; + case DOUBLE: + columns[i] = new double[TABLET_ROW_LIMIT]; + break; + case TEXT: + columns[i] = new Binary[TABLET_ROW_LIMIT]; + Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataTypes[i])); + } + } this.bitMaps = new BitMap[this.measurements.length]; for (int i = 0; i < this.bitMaps.length; ++i) { this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index 90f1219767..ed4806c1b4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -151,6 +151,10 @@ public class LogicalPlanBuilder { keys.forEach(k -> context.getTypeProvider().setType(k, dataType)); } + private void updateTypeProviderWithConstantType(String columnName, TSDataType dataType) { + context.getTypeProvider().setType(columnName, dataType); + } + public LogicalPlanBuilder planRawDataSource( Set<Expression> sourceExpressions, Ordering scanOrder, Filter timeFilter) { List<PlanNode> sourceNodeList = new ArrayList<>(); @@ -854,6 +858,11 @@ public class LogicalPlanBuilder { return this; } + ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.forEach( + columnHeader -> { + updateTypeProviderWithConstantType( + columnHeader.getColumnName(), columnHeader.getColumnType()); + }); this.root = new DeviceViewIntoNode( context.getQueryId().genPlanNodeId(), this.getRoot(), deviceViewIntoPathDescriptor); @@ -865,6 +874,11 @@ public class LogicalPlanBuilder { return this; } + ColumnHeaderConstant.selectIntoColumnHeaders.forEach( + columnHeader -> { + updateTypeProviderWithConstantType( + columnHeader.getColumnName(), columnHeader.getColumnType()); + }); this.root = new IntoNode(context.getQueryId().genPlanNodeId(), this.getRoot(), intoPathDescriptor); return this; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 1dcf871373..e91d67b973 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1378,6 +1378,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap); } + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new IntoOperator( operatorContext, child, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java index 0a4b64f3de..06310be926 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java @@ -54,6 +54,11 @@ public class ExchangeNode extends SingleChildProcessNode { super(id); } + @Override + public int allowedChildCount() { + return CHILD_COUNT_NO_LIMIT; + } + @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitExchange(this, context);
