This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/SupportQueryWithView in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 37ad63bcb99d7cc45aad62b2183ac29c7d331da6 Author: liuminghui233 <[email protected]> AuthorDate: Sat May 27 23:30:23 2023 +0800 finish distribution planner --- .../plan/planner/distribution/SourceRewriter.java | 55 +++++++++++++++------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 03cc0fba950..d5815d14aeb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -72,6 +72,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -165,31 +166,53 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte List<DeviceViewSplit> deviceViewSplits = new ArrayList<>(); // Step 1: constructs DeviceViewSplit + Map<String, List<String>> outputDeviceToQueriedDeviceMap = + analysis.getOutputDeviceToQueriedDeviceMap(); for (int i = 0; i < node.getDevices().size(); i++) { - String device = node.getDevices().get(i); + String outputDevice = node.getDevices().get(i); PlanNode child = node.getChildren().get(i); - List<TRegionReplicaSet> regionReplicaSets = - analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter()); - deviceViewSplits.add(new DeviceViewSplit(device, child, regionReplicaSets)); + List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>(); + for (String queriedDevice : outputDeviceToQueriedDeviceMap.get(outputDevice)) { + regionReplicaSets.addAll( + analysis.getPartitionInfo(queriedDevice, analysis.getGlobalTimeFilter())); + } + deviceViewSplits.add(new DeviceViewSplit(outputDevice, child, regionReplicaSets)); relatedDataRegions.addAll(regionReplicaSets); } // Step 2: Iterate all partition and create DeviceViewNode for each region List<PlanNode> deviceViewNodeList = new ArrayList<>(); - for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) { - List<String> devices = new ArrayList<>(); - List<PlanNode> children = new ArrayList<>(); - for (DeviceViewSplit split : deviceViewSplits) { - if (split.needDistributeTo(regionReplicaSet)) { - devices.add(split.device); - children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext)); - } + Iterator<DeviceViewSplit> deviceViewSplitIterator = deviceViewSplits.listIterator(); + while (deviceViewSplitIterator.hasNext()) { + DeviceViewSplit deviceViewSplit = deviceViewSplitIterator.next(); + String outputDevice = deviceViewSplit.device; + + if (outputDeviceToQueriedDeviceMap.get(outputDevice).size() > 1) { + DeviceViewNode deviceViewNode = cloneDeviceViewNodeWithoutChild(node, context); + deviceViewNode.addChildDeviceNode( + outputDevice, process(deviceViewSplit.root, context).get(0)); + deviceViewNodeList.add(deviceViewNode); + deviceViewSplitIterator.remove(); } - DeviceViewNode regionDeviceViewNode = cloneDeviceViewNodeWithoutChild(node, context); - for (int i = 0; i < devices.size(); i++) { - regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i)); + } + if (deviceViewSplits.size() > 0) { + for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) { + List<String> devices = new ArrayList<>(); + List<PlanNode> children = new ArrayList<>(); + for (DeviceViewSplit split : deviceViewSplits) { + if (split.needDistributeTo(regionReplicaSet)) { + devices.add(split.device); + children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext)); + } + } + DeviceViewNode regionDeviceViewNode = cloneDeviceViewNodeWithoutChild(node, context); + for (int i = 0; i < devices.size(); i++) { + regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i)); + } + if (regionDeviceViewNode.getChildren().size() > 0) { + deviceViewNodeList.add(regionDeviceViewNode); + } } - deviceViewNodeList.add(regionDeviceViewNode); } if (deviceViewNodeList.size() == 1) {
