This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/InnerTimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/InnerTimeJoin by this push:
     new bcb725cbad0 add ExchangeAdder
bcb725cbad0 is described below

commit bcb725cbad0c2f48d76767676de8e8aa566ae7fe
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jan 8 10:48:09 2024 +0800

    add ExchangeAdder
---
 .../planner/distribution/ExchangeNodeAdder.java    | 78 ++++++++++++++++++++++
 1 file changed, 78 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 91092cd179d..d6111a947f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -49,6 +49,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
@@ -61,6 +63,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -242,6 +245,81 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, 
NodeGroupContext context) {
+    LeftOuterTimeJoinNode newNode = (LeftOuterTimeJoinNode) node.clone();
+    PlanNode leftChild = visit(node.getLeftChild(), context);
+    PlanNode rightChild = visit(node.getRightChild(), context);
+    // DataRegion which node locates
+    TRegionReplicaSet dataRegion;
+    boolean isChildrenDistributionSame =
+        nodeDistributionIsSame(Arrays.asList(leftChild, rightChild), context);
+    NodeDistributionType distributionType =
+        isChildrenDistributionSame
+            ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+            : NodeDistributionType.SAME_WITH_SOME_CHILD;
+
+    if (context.isAlignByDevice()) {
+      // For align by device,
+      // if dataRegions of children are the same, we set child's dataRegion to 
this node,
+      // else we set the selected mostlyUsedDataRegion to this node
+      dataRegion =
+          isChildrenDistributionSame
+              ? context.getNodeDistribution(leftChild.getPlanNodeId()).region
+              : context.getMostlyUsedDataRegion();
+      context.putNodeDistribution(
+          newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
+    } else {
+      dataRegion = calculateDataRegionByChildren(Arrays.asList(leftChild, 
rightChild), context);
+      context.putNodeDistribution(
+          newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
+    }
+
+    // If the distributionType of all the children are same, no ExchangeNode 
need to be added.
+    if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+      newNode.setLeftChild(leftChild);
+      newNode.setRightChild(rightChild);
+      return newNode;
+    }
+
+    // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
+    // parent.
+    if 
(!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).region))
 {
+      if (leftChild instanceof SingleDeviceViewNode) {
+        ((SingleDeviceViewNode) leftChild).setCacheOutputColumnNames(true);
+      }
+      ExchangeNode exchangeNode =
+          new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+      exchangeNode.setChild(leftChild);
+      exchangeNode.setOutputColumnNames(leftChild.getOutputColumnNames());
+      context.hasExchangeNode = true;
+      newNode.setLeftChild(exchangeNode);
+    } else {
+      newNode.setLeftChild(leftChild);
+    }
+
+    if 
(!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).region))
 {
+      if (rightChild instanceof SingleDeviceViewNode) {
+        ((SingleDeviceViewNode) rightChild).setCacheOutputColumnNames(true);
+      }
+      ExchangeNode exchangeNode =
+          new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+      exchangeNode.setChild(rightChild);
+      exchangeNode.setOutputColumnNames(rightChild.getOutputColumnNames());
+      context.hasExchangeNode = true;
+      newNode.setRightChild(exchangeNode);
+    } else {
+      newNode.setRightChild(rightChild);
+    }
+
+    return newNode;
+  }
+
+  @Override
+  public PlanNode visitInnerTimeJoin(InnerTimeJoinNode node, NodeGroupContext 
context) {
+    return processMultiChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitAggregation(AggregationNode node, NodeGroupContext 
context) {
     return processMultiChildNode(node, context);

Reply via email to