This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6c62784c53 [multistage][refactor] Refactor RelNode to StageNode
conversion (#10730)
6c62784c53 is described below
commit 6c62784c5382d689099617d4020832d603761c46
Author: Xiang Fu <[email protected]>
AuthorDate: Sat May 6 16:58:04 2023 -0700
[multistage][refactor] Refactor RelNode to StageNode conversion (#10730)
* Convert RelNode to StageNode in multiple steps
* Merge exchange spliter and stageId assigner into a StageFragmenter
---
.../query/planner/ExplainPlanStageVisitor.java | 6 +
.../query/planner/logical/RelToStageConverter.java | 25 ++++
.../planner/logical/ShuffleRewriteVisitor.java | 6 +
.../query/planner/logical/StageFragmenter.java | 141 +++++++++++++++++++++
.../pinot/query/planner/logical/StagePlanner.java | 80 ++----------
.../planner/physical/DispatchablePlanVisitor.java | 6 +
.../colocated/GreedyShuffleRewriteVisitor.java | 6 +
.../query/planner/stage/AbstractStageNode.java | 7 +-
.../stage/DefaultPostOrderTraversalVisitor.java | 6 +
.../pinot/query/planner/stage/ExchangeNode.java | 93 ++++++++++++++
.../query/planner/stage/MailboxReceiveNode.java | 4 +
.../pinot/query/planner/stage/MailboxSendNode.java | 4 +
.../pinot/query/planner/stage/StageNode.java | 2 +
.../query/planner/stage/StageNodeSerDeUtils.java | 3 +
.../query/planner/stage/StageNodeVisitor.java | 2 +
.../query/runtime/plan/PhysicalPlanVisitor.java | 6 +
.../runtime/plan/ServerRequestPlanVisitor.java | 6 +
17 files changed, 333 insertions(+), 70 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
index 325c1cd7ad..e639701256 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -129,6 +130,11 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
return context._builder;
}
+ @Override
+ public StringBuilder visitExchange(ExchangeNode exchangeNode, Context
context) {
+ throw new UnsupportedOperationException("ExchangeNode should not be
visited");
+ }
+
@Override
public StringBuilder visitFilter(FilterNode node, Context context) {
return visitSimpleNode(node, context);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 576f209e8c..ea0c46b390 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -20,10 +20,14 @@ package org.apache.pinot.query.planner.logical;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -32,6 +36,7 @@ import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
@@ -40,6 +45,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
@@ -88,11 +94,30 @@ public final class RelToStageConverter {
return convertLogicalWindow((LogicalWindow) node, currentStageId);
} else if (node instanceof SetOp) {
return convertLogicalSetOp((SetOp) node, currentStageId);
+ } else if (node instanceof Exchange) {
+ return convertLogicalExchange((Exchange) node, currentStageId);
} else {
throw new UnsupportedOperationException("Unsupported logical plan node:
" + node);
}
}
+ private static StageNode convertLogicalExchange(Exchange node, int
currentStageId) {
+ RelCollation collation = null;
+ boolean isSortOnSender = false;
+ boolean isSortOnReceiver = false;
+ if (node instanceof SortExchange) {
+ collation = ((SortExchange) node).getCollation();
+ if (node instanceof PinotLogicalSortExchange) {
+ // These flags only take meaning if the collation is not null or empty
+ isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
+ isSortOnReceiver = ((PinotLogicalSortExchange)
node).isSortOnReceiver();
+ }
+ }
+ List<RelFieldCollation> fieldCollations = (collation == null) ? null :
collation.getFieldCollations();
+ return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()),
node.getDistribution(), fieldCollations,
+ isSortOnSender, isSortOnReceiver);
+ }
+
private static StageNode convertLogicalSetOp(SetOp node, int currentStageId)
{
return new SetOpNode(SetOpNode.SetOpType.fromObject(node), currentStageId,
toDataSchema(node.getRowType()),
node.all);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index 6f5376c1f5..664ddd44ff 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -87,6 +88,11 @@ public class ShuffleRewriteVisitor implements
StageNodeVisitor<Set<Integer>, Voi
return newPartitionKeys;
}
+ @Override
+ public Set<Integer> visitExchange(ExchangeNode exchangeNode, Void context) {
+ throw new UnsupportedOperationException("Exchange not yet supported!");
+ }
+
@Override
public Set<Integer> visitFilter(FilterNode node, Void context) {
// filters don't change partition keys
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
new file mode 100644
index 0000000000..e33638d824
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SetOpNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
+
+
+public class StageFragmenter implements StageNodeVisitor<StageNode,
StageFragmenter.Context> {
+ public static final StageFragmenter INSTANCE = new StageFragmenter();
+
+ private StageNode process(StageNode node, Context context) {
+ node.setStageId(context._currentStageId);
+ List<StageNode> inputs = node.getInputs();
+ for (int i = 0; i < inputs.size(); i++) {
+ context._previousStageId = node.getStageId();
+ inputs.set(i, inputs.get(i).visit(this, context));
+ }
+ return node;
+ }
+
+ @Override
+ public StageNode visitAggregate(AggregateNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitFilter(FilterNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitJoin(JoinNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitMailboxReceive(MailboxReceiveNode node, Context
context) {
+ throw new UnsupportedOperationException("MailboxReceiveNode should not be
visited by StageFragmenter");
+ }
+
+ @Override
+ public StageNode visitMailboxSend(MailboxSendNode node, Context context) {
+ throw new UnsupportedOperationException("MailboxSendNode should not be
visited by StageFragmenter");
+ }
+
+ @Override
+ public StageNode visitProject(ProjectNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitSort(SortNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitTableScan(TableScanNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitValue(ValueNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitWindow(WindowNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitSetOp(SetOpNode node, Context context) {
+ return process(node, context);
+ }
+
+ @Override
+ public StageNode visitExchange(ExchangeNode node, Context context) {
+ int nodeStageId = context._previousStageId;
+
+ context._currentStageId++;
+ StageNode nextStageRoot = node.getInputs().get(0).visit(this, context);
+
+ List<Integer> distributionKeys = node.getDistributionKeys();
+ RelDistribution.Type exchangeType = node.getDistributionType();
+
+ // make an exchange sender and receiver node pair
+ // only HASH_DISTRIBUTED requires a partition key selector; so all other
types (SINGLETON and BROADCAST)
+ // of exchange will not carry a partition key selector.
+ KeySelector<Object[], Object[]> keySelector = exchangeType ==
RelDistribution.Type.HASH_DISTRIBUTED
+ ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+ StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
+ nodeStageId, exchangeType, keySelector, node.getCollations(),
node.isSortOnSender());
+ StageNode mailboxReceiver = new MailboxReceiveNode(nodeStageId,
nextStageRoot.getDataSchema(),
+ nextStageRoot.getStageId(), exchangeType, keySelector,
+ node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(),
mailboxSender);
+ mailboxSender.addInput(nextStageRoot);
+
+ return mailboxReceiver;
+ }
+
+ public static class Context {
+
+ // Stage ID starts with 1, 0 will be reserved for ROOT stage.
+ Integer _currentStageId = 1;
+ Integer _previousStageId = 1;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index ce0417ea0f..91e1ba7a89 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -20,18 +20,12 @@ package org.apache.pinot.query.planner.logical;
import java.util.List;
import java.util.Set;
-import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.SortExchange;
-import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
import org.apache.pinot.query.planner.physical.DispatchablePlanVisitor;
import
org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
@@ -50,7 +44,6 @@ public class StagePlanner {
private final PlannerContext _plannerContext; // DO NOT REMOVE.
private final WorkerManager _workerManager;
private final TableCache _tableCache;
- private int _stageIdCounter;
private long _requestId;
public StagePlanner(PlannerContext plannerContext, WorkerManager
workerManager, long requestId,
@@ -69,11 +62,12 @@ public class StagePlanner {
*/
public QueryPlan makePlan(RelRoot relRoot, Set<String> tableNames) {
RelNode relRootNode = relRoot.rel;
- // Stage ID starts with 1, 0 will be reserved for ROOT stage.
- _stageIdCounter = 1;
- // walk the plan and create stages.
- StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
+ // Walk through RelNode tree and construct a StageNode tree.
+ StageNode globalStageRoot = relNodeToStageNode(relRootNode);
+
+ // Fragment the stage tree into multiple stages.
+ globalStageRoot = globalStageRoot.visit(StageFragmenter.INSTANCE, new
StageFragmenter.Context());
// global root needs to send results back to the ROOT, a.k.a. the client
response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to
SINGLETON by default.
@@ -99,31 +93,13 @@ public class StagePlanner {
// non-threadsafe
// TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
- private StageNode walkRelPlan(RelNode node, int currentStageId) {
- if (isExchangeNode(node)) {
- StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
- RelDistribution distribution = ((Exchange) node).getDistribution();
- RelCollation collation = null;
- boolean isSortOnSender = false;
- boolean isSortOnReceiver = false;
- if (isSortExchangeNode(node)) {
- collation = ((SortExchange) node).getCollation();
- if (node instanceof PinotLogicalSortExchange) {
- // These flags only take meaning if the collation is not null or
empty
- isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
- isSortOnReceiver = ((PinotLogicalSortExchange)
node).isSortOnReceiver();
- }
- }
- return createSendReceivePair(nextStageRoot, distribution, collation,
isSortOnSender, isSortOnReceiver,
- currentStageId);
- } else {
- StageNode stageNode = RelToStageConverter.toStageNode(node,
currentStageId);
- List<RelNode> inputs = node.getInputs();
- for (RelNode input : inputs) {
- stageNode.addInput(walkRelPlan(input, currentStageId));
- }
- return stageNode;
+ private StageNode relNodeToStageNode(RelNode node) {
+ StageNode stageNode = RelToStageConverter.toStageNode(node, -1);
+ List<RelNode> inputs = node.getInputs();
+ for (RelNode input : inputs) {
+ stageNode.addInput(relNodeToStageNode(input));
}
+ return stageNode;
}
// TODO: Switch to Worker SPI to avoid multiple-places where workers are
assigned.
@@ -132,38 +108,4 @@ public class StagePlanner {
GreedyShuffleRewriteVisitor.optimizeShuffles(queryPlan, _tableCache);
}
}
-
- private StageNode createSendReceivePair(StageNode nextStageRoot,
RelDistribution distribution, RelCollation collation,
- boolean isSortOnSender, boolean isSortOnReceiver, int currentStageId) {
- List<Integer> distributionKeys = distribution.getKeys();
- RelDistribution.Type exchangeType = distribution.getType();
-
- // make an exchange sender and receiver node pair
- // only HASH_DISTRIBUTED requires a partition key selector; so all other
types (SINGLETON and BROADCAST)
- // of exchange will not carry a partition key selector.
- KeySelector<Object[], Object[]> keySelector = exchangeType ==
RelDistribution.Type.HASH_DISTRIBUTED
- ? new FieldSelectionKeySelector(distributionKeys) : null;
-
- StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
- currentStageId, exchangeType, keySelector, collation == null ? null :
collation.getFieldCollations(),
- isSortOnSender);
- StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), exchangeType, keySelector,
- collation == null ? null : collation.getFieldCollations(),
isSortOnSender, isSortOnReceiver, mailboxSender);
- mailboxSender.addInput(nextStageRoot);
-
- return mailboxReceiver;
- }
-
- private boolean isExchangeNode(RelNode node) {
- return (node instanceof Exchange);
- }
-
- private boolean isSortExchangeNode(RelNode node) {
- return (node instanceof SortExchange);
- }
-
- private int getNewStageId() {
- return _stageIdCounter++;
- }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index 82a1afe936..5258fc5637 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.physical;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -118,6 +119,11 @@ public class DispatchablePlanVisitor implements
StageNodeVisitor<Void, Dispatcha
return null;
}
+ @Override
+ public Void visitExchange(ExchangeNode exchangeNode, DispatchablePlanContext
context) {
+ throw new UnsupportedOperationException("ExchangeNode should not be
visited by DispatchablePlanVisitor");
+ }
+
@Override
public Void visitFilter(FilterNode node, DispatchablePlanContext context) {
node.getInputs().get(0).visit(this, context);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 4f89435a6e..6d58bde3af 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -257,6 +258,11 @@ public class GreedyShuffleRewriteVisitor implements
StageNodeVisitor<Set<Colocat
return ImmutableSet.of();
}
+ @Override
+ public Set<ColocationKey> visitExchange(ExchangeNode exchangeNode,
GreedyShuffleRewriteContext context) {
+ throw new UnsupportedOperationException("ExchangeNode should not be
visited by this visitor");
+ }
+
@Override
public Set<ColocationKey> visitTableScan(TableScanNode node,
GreedyShuffleRewriteContext context) {
TableConfig tableConfig = _tableCache.getTableConfig(node.getTableName());
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
index 46de8731b7..c2f0e4b1be 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
@@ -28,7 +28,7 @@ import
org.apache.pinot.query.planner.serde.ProtoSerializationUtils;
public abstract class AbstractStageNode implements StageNode,
ProtoSerializable {
- protected final int _stageId;
+ protected int _stageId;
protected final List<StageNode> _inputs;
protected DataSchema _dataSchema;
@@ -47,6 +47,11 @@ public abstract class AbstractStageNode implements
StageNode, ProtoSerializable
return _stageId;
}
+ @Override
+ public void setStageId(int stageId) {
+ _stageId = stageId;
+ }
+
@Override
public List<StageNode> getInputs() {
return _inputs;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
index 5e8f604bf0..4c93f9d289 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
@@ -90,4 +90,10 @@ public abstract class DefaultPostOrderTraversalVisitor<T, C>
implements StageNod
node.getInputs().forEach(input -> input.visit(this, context));
return process(node, context);
}
+
+ @Override
+ public T visitExchange(ExchangeNode node, C context) {
+ node.getInputs().forEach(input -> input.visit(this, context));
+ return process(node, context);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
new file mode 100644
index 0000000000..328dd8568c
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+/**
+ * ExchangeNode represents the exchange stage in the query plan.
+ * It is used to exchange the data between the instances.
+ */
+public class ExchangeNode extends AbstractStageNode {
+
+ @ProtoProperties
+ private RelDistribution.Type _exchangeType;
+
+ @ProtoProperties
+ private List<Integer> _keys;
+
+ @ProtoProperties
+ private boolean _isSortOnSender = false;
+
+ @ProtoProperties
+ private boolean _isSortOnReceiver = false;
+
+ @ProtoProperties
+ private List<RelFieldCollation> _collations;
+
+ public ExchangeNode(int stageId) {
+ super(stageId);
+ }
+
+ public ExchangeNode(int currentStageId, DataSchema dataSchema,
RelDistribution distribution,
+ List<RelFieldCollation> collations, boolean isSortOnSender,
+ boolean isSortOnReceiver) {
+ super(currentStageId, dataSchema);
+ _keys = distribution.getKeys();
+ _exchangeType = distribution.getType();
+ _isSortOnSender = isSortOnSender;
+ _isSortOnReceiver = isSortOnReceiver;
+ _collations = collations;
+ }
+
+ @Override
+ public String explain() {
+ return "EXCHANGE";
+ }
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitExchange(this, context);
+ }
+
+ public RelDistribution.Type getDistributionType() {
+ return _exchangeType;
+ }
+
+ public List<Integer> getDistributionKeys() {
+ return _keys;
+ }
+
+ public boolean isSortOnSender() {
+ return _isSortOnSender;
+ }
+
+ public boolean isSortOnReceiver() {
+ return _isSortOnReceiver;
+ }
+
+ public List<RelFieldCollation> getCollations() {
+ return _collations;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index 97a019741f..13eb8b5296 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -81,6 +81,10 @@ public class MailboxReceiveNode extends AbstractStageNode {
_sender = sender;
}
+ public void setSenderStageId(Integer senderStageId) {
+ _senderStageId = senderStageId;
+ }
+
public int getSenderStageId() {
return _senderStageId;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index c98b82907a..54f80c6794 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -77,6 +77,10 @@ public class MailboxSendNode extends AbstractStageNode {
return _receiverStageId;
}
+ public void setReceiverStageId(int receiverStageId) {
+ _receiverStageId = receiverStageId;
+ }
+
public void setExchangeType(RelDistribution.Type exchangeType) {
_exchangeType = exchangeType;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
index 7e3278cfe8..ae851f449b 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
@@ -35,6 +35,8 @@ public interface StageNode extends Serializable {
int getStageId();
+ void setStageId(int stageId);
+
List<StageNode> getInputs();
void addInput(StageNode stageNode);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 45a9b8c1df..f96eadfc06 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -87,6 +87,9 @@ public final class StageNodeSerDeUtils {
return new WindowNode(stageId);
case "SetOpNode":
return new SetOpNode(stageId);
+ case "ExchangeNode":
+ throw new IllegalArgumentException(
+ "ExchangeNode should be already split into MailboxSendNode and
MailboxReceiveNode");
default:
throw new IllegalArgumentException("Unknown node name: " + nodeName);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
index 78acf94c76..f72e9540ec 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
@@ -58,4 +58,6 @@ public interface StageNodeVisitor<T, C> {
T visitWindow(WindowNode node, C context);
T visitSetOp(SetOpNode setOpNode, C context);
+
+ T visitExchange(ExchangeNode exchangeNode, C context);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 4ff613e3ec..27c4302197 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.plan;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -130,6 +131,11 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
}
}
+ @Override
+ public MultiStageOperator visitExchange(ExchangeNode exchangeNode,
PlanRequestContext context) {
+ throw new UnsupportedOperationException("ExchangeNode should not be
visited");
+ }
+
@Override
public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext
context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index db6053a3c9..512e8717e5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.parser.CalciteRexExpressionParser;
import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -189,6 +190,11 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
return null;
}
+ @Override
+ public Void visitExchange(ExchangeNode exchangeNode,
ServerPlanRequestContext context) {
+ throw new UnsupportedOperationException("Exchange not yet supported!");
+ }
+
@Override
public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
visitChildren(node, context);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]