This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aeff0ab7a9c32d2469a1f8362543b358dd025003 Author: Minghui Liu <[email protected]> AuthorDate: Tue Oct 18 10:00:12 2022 +0800 add OperatorTreeGenerator for IntoOperator --- .../execution/operator/process/IntoOperator.java | 26 +++++------ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 52 ++++++++++++++++++++++ .../plan/planner/plan/node/process/IntoNode.java | 4 ++ 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 1251bb3e50..7dc26ddc28 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -55,39 +55,39 @@ public class IntoOperator implements ProcessOperator { private final Operator child; private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators; - private final List<Pair<String, String>> sourceTargetPathPairList; + private final List<Pair<String, PartialPath>> sourceTargetPathPairList; private final Map<String, InputLocation> sourceColumnToInputLocationMap; public IntoOperator( OperatorContext operatorContext, Operator child, - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap, + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, - Map<PartialPath, Boolean> targetDeviceToAlignedMap, - List<Pair<String, String>> sourceTargetPathPairList, + Map<String, Boolean> targetDeviceToAlignedMap, + List<Pair<String, PartialPath>> sourceTargetPathPairList, Map<String, InputLocation> sourceColumnToInputLocationMap) { this.operatorContext = operatorContext; this.child = child; this.insertTabletStatementGenerators = constructInsertTabletStatementGenerators( - targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); + targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); this.sourceTargetPathPairList = sourceTargetPathPairList; this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; } private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap, + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, - Map<PartialPath, Boolean> targetDeviceToAlignedMap) { + Map<String, Boolean> targetDeviceToAlignedMap) { List<InsertTabletStatementGenerator> insertTabletStatementGenerators = - new ArrayList<>(targetPathToSourceMap.size()); - for (PartialPath targetDevice : targetPathToSourceMap.keySet()) { + new ArrayList<>(targetPathToSourceInputLocationMap.size()); + for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) { InsertTabletStatementGenerator generator = new InsertTabletStatementGenerator( targetDevice, - targetPathToSourceMap.get(targetDevice), + targetPathToSourceInputLocationMap.get(targetDevice), targetPathToDataTypeMap.get(targetDevice), - targetDeviceToAlignedMap.get(targetDevice)); + targetDeviceToAlignedMap.get(targetDevice.toString())); insertTabletStatementGenerators.add(generator); } return insertTabletStatementGenerators; @@ -150,10 +150,10 @@ public class IntoOperator implements ProcessOperator { TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) { + for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) { timeColumnBuilder.writeLong(0); columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left)); - columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right)); + columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString())); columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left)); resultTsBlockBuilder.declarePosition(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index f3de5e72d4..1dcf871373 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; @@ -137,6 +138,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode; @@ -158,6 +160,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregatio import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn; import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; @@ -1336,6 +1339,55 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return super.visitSort(node, context); } + @Override + public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getInstanceContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + IntoOperator.class.getSimpleName()); + + IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor(); + Map<String, List<InputLocation>> layout = makeLayout(node); + Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>(); + for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) { + sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0)); + } + + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = + new HashMap<>(); + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>(); + Map<PartialPath, Map<String, String>> targetPathToSourceMap = + intoPathDescriptor.getTargetPathToSourceMap(); + for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) { + PartialPath targetDevice = entry.getKey(); + Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>(); + Map<String, TSDataType> measurementToDataTypeMap = new HashMap<>(); + for (Map.Entry<String, String> measurementEntry : entry.getValue().entrySet()) { + String targetMeasurement = measurementEntry.getKey(); + String sourceColumn = measurementEntry.getValue(); + measurementToInputLocationMap.put( + targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn)); + measurementToDataTypeMap.put( + targetMeasurement, context.getTypeProvider().getType(sourceColumn)); + } + targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap); + targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap); + } + + return new IntoOperator( + operatorContext, + child, + targetPathToSourceInputLocationMap, + targetPathToDataTypeMap, + intoPathDescriptor.getTargetDeviceToAlignedMap(), + intoPathDescriptor.getSourceTargetPathPairList(), + sourceColumnToInputLocationMap); + } + @Override public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) { List<Operator> children = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java index b68e862a7a..73926bd8cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java @@ -48,6 +48,10 @@ public class IntoNode extends SingleChildProcessNode { this.intoPathDescriptor = intoPathDescriptor; } + public IntoPathDescriptor getIntoPathDescriptor() { + return intoPathDescriptor; + } + @Override public PlanNode clone() { return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);
