This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 41f8243a0d09e66aed2e3b8496f608c0dda2de01 Author: Minghui Liu <[email protected]> AuthorDate: Wed Oct 19 14:45:15 2022 +0800 OperatorTreeGenerator visitDeviceViewInto --- .../operator/process/AbstractIntoOperator.java | 3 +- .../operator/process/DeviceViewIntoOperator.java | 4 + .../db/mpp/plan/planner/OperatorTreeGenerator.java | 106 +++++++++++++++++---- .../plan/node/process/DeviceViewIntoNode.java | 4 + 4 files changed, 98 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index d038874d0b..28b90daae7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -82,7 +82,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { } protected void insertMultiTabletsInternally(boolean needCheck) { - if ((needCheck && !insertTabletStatementGenerators.get(0).isFull()) + if (insertTabletStatementGenerators == null + || (needCheck && !insertTabletStatementGenerators.get(0).isFull()) || insertTabletStatementGenerators.get(0).isEmpty()) { return; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index bf1a0920fa..f0eab0b71a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -105,6 +105,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { } private void updateResultTsBlock() { + if (currentDevice == null) { + return; + } + TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); for (Pair<String, PartialPath> sourceTargetPathPair : diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index e91d67b973..8fb6a0387e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewIntoOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator; @@ -132,6 +133,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode; @@ -157,6 +159,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; @@ -1351,17 +1354,95 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP IntoOperator.class.getSimpleName()); IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor(); - Map<String, List<InputLocation>> layout = makeLayout(node); + Map<String, InputLocation> sourceColumnToInputLocationMap = + constructSourceColumnToInputLocationMap(node); + + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = + new HashMap<>(); + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>(); + processTargetPathToSourceMap( + intoPathDescriptor.getTargetPathToSourceMap(), + targetPathToSourceInputLocationMap, + targetPathToDataTypeMap, + sourceColumnToInputLocationMap, + context.getTypeProvider()); + + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + return new IntoOperator( + operatorContext, + child, + targetPathToSourceInputLocationMap, + targetPathToDataTypeMap, + intoPathDescriptor.getTargetDeviceToAlignedMap(), + intoPathDescriptor.getSourceTargetPathPairList(), + sourceColumnToInputLocationMap); + } + + @Override + public Operator visitDeviceViewInto(DeviceViewIntoNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getInstanceContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + DeviceViewIntoOperator.class.getSimpleName()); + + DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = + node.getDeviceViewIntoPathDescriptor(); + Map<String, InputLocation> sourceColumnToInputLocationMap = + constructSourceColumnToInputLocationMap(node); + + Map<String, Map<PartialPath, Map<String, InputLocation>>> + deviceToTargetPathSourceInputLocationMap = new HashMap<>(); + Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap = + new HashMap<>(); + Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap = + deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap(); + for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry : + sourceDeviceToTargetPathMap.entrySet()) { + String sourceDevice = deviceEntry.getKey(); + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = + new HashMap<>(); + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>(); + processTargetPathToSourceMap( + deviceEntry.getValue(), + targetPathToSourceInputLocationMap, + targetPathToDataTypeMap, + sourceColumnToInputLocationMap, + context.getTypeProvider()); + deviceToTargetPathSourceInputLocationMap.put( + sourceDevice, targetPathToSourceInputLocationMap); + deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap); + } + + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + return new DeviceViewIntoOperator( + operatorContext, + child, + deviceToTargetPathSourceInputLocationMap, + deviceToTargetPathDataTypeMap, + deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(), + deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(), + sourceColumnToInputLocationMap); + } + + private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) { Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>(); + Map<String, List<InputLocation>> layout = makeLayout(node); for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) { sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0)); } + return sourceColumnToInputLocationMap; + } - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = - new HashMap<>(); - Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>(); - Map<PartialPath, Map<String, String>> targetPathToSourceMap = - intoPathDescriptor.getTargetPathToSourceMap(); + private void processTargetPathToSourceMap( + Map<PartialPath, Map<String, String>> targetPathToSourceMap, + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, + Map<String, InputLocation> sourceColumnToInputLocationMap, + TypeProvider typeProvider) { for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) { PartialPath targetDevice = entry.getKey(); Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>(); @@ -1371,22 +1452,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP String sourceColumn = measurementEntry.getValue(); measurementToInputLocationMap.put( targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn)); - measurementToDataTypeMap.put( - targetMeasurement, context.getTypeProvider().getType(sourceColumn)); + measurementToDataTypeMap.put(targetMeasurement, typeProvider.getType(sourceColumn)); } targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap); targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap); } - - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - return new IntoOperator( - operatorContext, - child, - targetPathToSourceInputLocationMap, - targetPathToDataTypeMap, - intoPathDescriptor.getTargetDeviceToAlignedMap(), - intoPathDescriptor.getSourceTargetPathPairList(), - sourceColumnToInputLocationMap); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java index 987cbc18d2..8c5055982c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java @@ -50,6 +50,10 @@ public class DeviceViewIntoNode extends SingleChildProcessNode { this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor; } + public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() { + return deviceViewIntoPathDescriptor; + } + @Override public PlanNode clone() { return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);
