This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/SupportQueryWithView
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 37ad63bcb99d7cc45aad62b2183ac29c7d331da6
Author: liuminghui233 <[email protected]>
AuthorDate: Sat May 27 23:30:23 2023 +0800

    finish distribution planner
---
 .../plan/planner/distribution/SourceRewriter.java  | 55 +++++++++++++++-------
 1 file changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 03cc0fba950..d5815d14aeb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -72,6 +72,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -165,31 +166,53 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
 
     List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
     // Step 1: constructs DeviceViewSplit
+    Map<String, List<String>> outputDeviceToQueriedDeviceMap =
+        analysis.getOutputDeviceToQueriedDeviceMap();
     for (int i = 0; i < node.getDevices().size(); i++) {
-      String device = node.getDevices().get(i);
+      String outputDevice = node.getDevices().get(i);
       PlanNode child = node.getChildren().get(i);
-      List<TRegionReplicaSet> regionReplicaSets =
-          analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter());
-      deviceViewSplits.add(new DeviceViewSplit(device, child, 
regionReplicaSets));
+      List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
+      for (String queriedDevice : 
outputDeviceToQueriedDeviceMap.get(outputDevice)) {
+        regionReplicaSets.addAll(
+            analysis.getPartitionInfo(queriedDevice, 
analysis.getGlobalTimeFilter()));
+      }
+      deviceViewSplits.add(new DeviceViewSplit(outputDevice, child, 
regionReplicaSets));
       relatedDataRegions.addAll(regionReplicaSets);
     }
 
     // Step 2: Iterate all partition and create DeviceViewNode for each region
     List<PlanNode> deviceViewNodeList = new ArrayList<>();
-    for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
-      List<String> devices = new ArrayList<>();
-      List<PlanNode> children = new ArrayList<>();
-      for (DeviceViewSplit split : deviceViewSplits) {
-        if (split.needDistributeTo(regionReplicaSet)) {
-          devices.add(split.device);
-          children.add(split.buildPlanNodeInRegion(regionReplicaSet, 
context.queryContext));
-        }
+    Iterator<DeviceViewSplit> deviceViewSplitIterator = 
deviceViewSplits.listIterator();
+    while (deviceViewSplitIterator.hasNext()) {
+      DeviceViewSplit deviceViewSplit = deviceViewSplitIterator.next();
+      String outputDevice = deviceViewSplit.device;
+
+      if (outputDeviceToQueriedDeviceMap.get(outputDevice).size() > 1) {
+        DeviceViewNode deviceViewNode = cloneDeviceViewNodeWithoutChild(node, 
context);
+        deviceViewNode.addChildDeviceNode(
+            outputDevice, process(deviceViewSplit.root, context).get(0));
+        deviceViewNodeList.add(deviceViewNode);
+        deviceViewSplitIterator.remove();
       }
-      DeviceViewNode regionDeviceViewNode = 
cloneDeviceViewNodeWithoutChild(node, context);
-      for (int i = 0; i < devices.size(); i++) {
-        regionDeviceViewNode.addChildDeviceNode(devices.get(i), 
children.get(i));
+    }
+    if (deviceViewSplits.size() > 0) {
+      for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
+        List<String> devices = new ArrayList<>();
+        List<PlanNode> children = new ArrayList<>();
+        for (DeviceViewSplit split : deviceViewSplits) {
+          if (split.needDistributeTo(regionReplicaSet)) {
+            devices.add(split.device);
+            children.add(split.buildPlanNodeInRegion(regionReplicaSet, 
context.queryContext));
+          }
+        }
+        DeviceViewNode regionDeviceViewNode = 
cloneDeviceViewNodeWithoutChild(node, context);
+        for (int i = 0; i < devices.size(); i++) {
+          regionDeviceViewNode.addChildDeviceNode(devices.get(i), 
children.get(i));
+        }
+        if (regionDeviceViewNode.getChildren().size() > 0) {
+          deviceViewNodeList.add(regionDeviceViewNode);
+        }
       }
-      deviceViewNodeList.add(regionDeviceViewNode);
     }
 
     if (deviceViewNodeList.size() == 1) {

Reply via email to