This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/InnerTimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/InnerTimeJoin by this push:
new bcb725cbad0 add ExchangeAdder
bcb725cbad0 is described below
commit bcb725cbad0c2f48d76767676de8e8aa566ae7fe
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jan 8 10:48:09 2024 +0800
add ExchangeAdder
---
.../planner/distribution/ExchangeNodeAdder.java | 78 ++++++++++++++++++++++
1 file changed, 78 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 91092cd179d..d6111a947f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -49,6 +49,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
@@ -61,6 +63,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -242,6 +245,81 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node,
NodeGroupContext context) {
+ LeftOuterTimeJoinNode newNode = (LeftOuterTimeJoinNode) node.clone();
+ PlanNode leftChild = visit(node.getLeftChild(), context);
+ PlanNode rightChild = visit(node.getRightChild(), context);
+ // DataRegion which node locates
+ TRegionReplicaSet dataRegion;
+ boolean isChildrenDistributionSame =
+ nodeDistributionIsSame(Arrays.asList(leftChild, rightChild), context);
+ NodeDistributionType distributionType =
+ isChildrenDistributionSame
+ ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+ : NodeDistributionType.SAME_WITH_SOME_CHILD;
+
+ if (context.isAlignByDevice()) {
+ // For align by device,
+ // if dataRegions of children are the same, we set child's dataRegion to
this node,
+ // else we set the selected mostlyUsedDataRegion to this node
+ dataRegion =
+ isChildrenDistributionSame
+ ? context.getNodeDistribution(leftChild.getPlanNodeId()).region
+ : context.getMostlyUsedDataRegion();
+ context.putNodeDistribution(
+ newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
+ } else {
+ dataRegion = calculateDataRegionByChildren(Arrays.asList(leftChild,
rightChild), context);
+ context.putNodeDistribution(
+ newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
+ }
+
+ // If the distributionType of all the children are same, no ExchangeNode
need to be added.
+ if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+ newNode.setLeftChild(leftChild);
+ newNode.setRightChild(rightChild);
+ return newNode;
+ }
+
+ // Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
+ // parent.
+ if
(!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).region))
{
+ if (leftChild instanceof SingleDeviceViewNode) {
+ ((SingleDeviceViewNode) leftChild).setCacheOutputColumnNames(true);
+ }
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(leftChild);
+ exchangeNode.setOutputColumnNames(leftChild.getOutputColumnNames());
+ context.hasExchangeNode = true;
+ newNode.setLeftChild(exchangeNode);
+ } else {
+ newNode.setLeftChild(leftChild);
+ }
+
+ if
(!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).region))
{
+ if (rightChild instanceof SingleDeviceViewNode) {
+ ((SingleDeviceViewNode) rightChild).setCacheOutputColumnNames(true);
+ }
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(rightChild);
+ exchangeNode.setOutputColumnNames(rightChild.getOutputColumnNames());
+ context.hasExchangeNode = true;
+ newNode.setRightChild(exchangeNode);
+ } else {
+ newNode.setRightChild(rightChild);
+ }
+
+ return newNode;
+ }
+
+ @Override
+ public PlanNode visitInnerTimeJoin(InnerTimeJoinNode node, NodeGroupContext
context) {
+ return processMultiChildNode(node, context);
+ }
+
@Override
public PlanNode visitAggregation(AggregationNode node, NodeGroupContext
context) {
return processMultiChildNode(node, context);