This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4935326785 [multistage] refactor traversals of stage nodes into
visitor pattern (#9560)
4935326785 is described below
commit 493532678507ce8e39f576b747b2a9eb684676b5
Author: Almog Gavra <[email protected]>
AuthorDate: Mon Oct 17 12:59:05 2022 -0700
[multistage] refactor traversals of stage nodes into visitor pattern (#9560)
This PR improves the traversal code around StageNode. It sets up a common
pattern for visiting nodes, collecting information, rewriting and making other
changes. This PR is setup for one that will help us implement a global sort
stage for LIMIT/OFFSET queries and support sort push down.
There are five main parts to look at:
1. I added the `StageNodeVisitor` interface and implemented `visit` in all
of the Stage Node implementations
2. I refactored the `partitionKey` optimization (that removes a shuffle if
not necessary) into a Visitor (`ShuffleRewriter`)
3. I refactored constructing the `QueryPlan` metadata into a visitor (this
is in preparation for the next PR) (`QueryPlanGenerator`)
4. I refactored constructing the `Operator` into a visitor
(`PhyscialPlanBuilder`)
5. I refactored `QueryPlan#explain` into a visitor, and also improved the
functionality (see new plan explain below)
Lastly, I added some quality of life improvements in debug-ability and I
identified a "bug" in nested loop joins - though I'll fix that one in a future
PR (see `FIXME` comment)
Example of the improved explain (it now properly recognizes which nodes are
executing what code):
```
[0]@localhost:51925 MAIL_RECEIVE(RANDOM_DISTRIBUTED)
├── [1]@localhost:51923
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925} (Subtree Omitted)
└── [1]@localhost:51924 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925}
└── [1]@localhost:51924 JOIN
├── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
│ ├── [2]@localhost:51924
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
│ │ └── [2]@localhost:51924 TABLE SCAN (a) {REALTIME=[a3]}
│ └── [2]@localhost:51923
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
│ └── [2]@localhost:51923 TABLE SCAN (a) {REALTIME=[a1, a2]}
└── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
└── [3]@localhost:51923
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
└── [3]@localhost:51923 TABLE SCAN (b) {REALTIME=[b1]}
```
---
.../query/planner/ExplainPlanStageVisitor.java | 224 +++++++++++++++++++++
.../org/apache/pinot/query/planner/QueryPlan.java | 60 +-----
.../apache/pinot/query/planner/StageMetadata.java | 7 +
.../planner/logical/ShuffleRewriteVisitor.java | 192 ++++++++++++++++++
.../planner/logical/StageMetadataVisitor.java | 132 ++++++++++++
.../pinot/query/planner/logical/StagePlanner.java | 171 +++-------------
.../query/planner/stage/AbstractStageNode.java | 15 --
.../pinot/query/planner/stage/AggregateNode.java | 5 +
.../pinot/query/planner/stage/FilterNode.java | 5 +
.../apache/pinot/query/planner/stage/JoinNode.java | 5 +
.../query/planner/stage/MailboxReceiveNode.java | 23 ++-
.../pinot/query/planner/stage/MailboxSendNode.java | 11 +-
.../pinot/query/planner/stage/ProjectNode.java | 5 +
.../apache/pinot/query/planner/stage/SortNode.java | 5 +
.../pinot/query/planner/stage/StageNode.java | 8 +-
.../query/planner/stage/StageNodeVisitor.java | 57 ++++++
.../pinot/query/planner/stage/TableScanNode.java | 5 +
.../pinot/query/planner/stage/ValueNode.java | 5 +
.../pinot/query/mailbox/GrpcMailboxService.java | 5 +
.../query/runtime/blocks/TransferableBlock.java | 2 +-
.../runtime/executor/PhysicalPlanVisitor.java | 149 ++++++++++++++
.../runtime/executor/WorkerQueryExecutor.java | 79 +-------
.../query/runtime/operator/AggregateOperator.java | 4 +-
.../query/runtime/operator/FilterOperator.java | 4 +-
.../query/runtime/operator/HashJoinOperator.java | 8 +-
.../runtime/operator/MailboxReceiveOperator.java | 7 +
.../runtime/operator/MailboxSendOperator.java | 4 +-
.../pinot/query/runtime/operator/SortOperator.java | 4 +-
.../query/runtime/operator/TransformOperator.java | 4 +-
29 files changed, 898 insertions(+), 307 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
new file mode 100644
index 0000000000..e0345590dc
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * A visitor that converts a {@code QueryPlan} into a human-readable string
representation.
+ *
+ * <p>It is currently not used programmatically and cannot be accessed by the
user. Instead,
+ * it is intended for use in manual debugging (e.g. setting breakpoints and
calling QueryPlan#explain()).
+ */
+public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder, ExplainPlanStageVisitor.Context> {
+
+ private final QueryPlan _queryPlan;
+
+ /**
+ * Explains the query plan.
+ *
+ * @see QueryPlan#explain()
+ * @param queryPlan the queryPlan to explain
+ * @return a String representation of the query plan tree
+ */
+ public static String explain(QueryPlan queryPlan) {
+ if (queryPlan.getQueryStageMap().isEmpty()) {
+ return "EMPTY";
+ }
+
+ // the root of a query plan always only has a single node
+ ServerInstance rootServer =
queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
+ return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0),
rootServer);
+ }
+
+ /**
+ * Explains the query plan from a specific point in the subtree, taking
{@code rootServer}
+ * as the node that is executing this sub-tree. This is helpful for
debugging what is happening
+ * at a given point in time (for example, printing the tree that will be
executed on a
+ * local node right before it is executed).
+ *
+ * @param queryPlan the entire query plan, including non-executed portions
+ * @param node the node to begin traversal
+ * @param rootServer the server instance that is executing this plan (should
execute {@code node})
+ *
+ * @return a query plan associated with
+ */
+ public static String explainFrom(QueryPlan queryPlan, StageNode node,
ServerInstance rootServer) {
+ final ExplainPlanStageVisitor visitor = new
ExplainPlanStageVisitor(queryPlan);
+ return node
+ .visit(visitor, new Context(rootServer, "", "", new StringBuilder()))
+ .toString();
+ }
+
+ private ExplainPlanStageVisitor(QueryPlan queryPlan) {
+ _queryPlan = queryPlan;
+ }
+
+ private StringBuilder appendInfo(StageNode node, Context context) {
+ int stage = node.getStageId();
+ context._builder
+ .append(context._prefix)
+ .append('[')
+ .append(stage)
+ .append("]@")
+ .append(context._host.getHostname())
+ .append(':')
+ .append(context._host.getPort())
+ .append(' ')
+ .append(node.explain());
+ return context._builder;
+ }
+
+ private StringBuilder visitSimpleNode(StageNode node, Context context) {
+ appendInfo(node, context).append('\n');
+ return node.getInputs().get(0).visit(this, context.next(false,
context._host));
+ }
+
+ @Override
+ public StringBuilder visitAggregate(AggregateNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitFilter(FilterNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitJoin(JoinNode node, Context context) {
+ appendInfo(node, context).append('\n');
+ node.getInputs().get(0).visit(this, context.next(true, context._host));
+ node.getInputs().get(1).visit(this, context.next(false, context._host));
+ return context._builder;
+ }
+
+ @Override
+ public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context
context) {
+ appendInfo(node, context).append('\n');
+
+ MailboxSendNode sender = (MailboxSendNode) node.getSender();
+ int senderStageId = node.getSenderStageId();
+ StageMetadata metadata =
_queryPlan.getStageMetadataMap().get(senderStageId);
+ Map<ServerInstance, Map<String, List<String>>> segments =
metadata.getServerInstanceToSegmentsMap();
+
+ Iterator<ServerInstance> iterator =
metadata.getServerInstances().iterator();
+ while (iterator.hasNext()) {
+ ServerInstance serverInstance = iterator.next();
+ if (segments.containsKey(serverInstance)) {
+ // always print out leaf stages
+ sender.visit(this, context.next(iterator.hasNext(), serverInstance));
+ } else {
+ if (!iterator.hasNext()) {
+ // always print out the last one
+ sender.visit(this, context.next(false, serverInstance));
+ } else {
+ // only print short version of the sender node
+ appendMailboxSend(sender, context.next(true, serverInstance))
+ .append(" (Subtree Omitted)")
+ .append('\n');
+ }
+ }
+ }
+ return context._builder;
+ }
+
+ @Override
+ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context)
{
+ appendMailboxSend(node, context).append('\n');
+ return node.getInputs().get(0).visit(this, context.next(false,
context._host));
+ }
+
+ private StringBuilder appendMailboxSend(MailboxSendNode node, Context
context) {
+ appendInfo(node, context);
+
+ int receiverStageId = node.getReceiverStageId();
+ List<ServerInstance> servers =
_queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
+ context._builder.append("->");
+ String receivers = servers.stream()
+ .map(s -> s.getHostname() + ':' + s.getPort())
+ .map(s -> "[" + receiverStageId + "]@" + s)
+ .collect(Collectors.joining(",", "{", "}"));
+ return context._builder.append(receivers);
+ }
+
+ @Override
+ public StringBuilder visitProject(ProjectNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitSort(SortNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitTableScan(TableScanNode node, Context context) {
+ return appendInfo(node, context)
+ .append(' ')
+ .append(_queryPlan.getStageMetadataMap()
+ .get(node.getStageId())
+ .getServerInstanceToSegmentsMap()
+ .get(context._host))
+ .append('\n');
+ }
+
+ @Override
+ public StringBuilder visitValue(ValueNode node, Context context) {
+ return appendInfo(node, context);
+ }
+
+ static class Context {
+ final ServerInstance _host;
+ final String _prefix;
+ final String _childPrefix;
+ final StringBuilder _builder;
+
+ Context(ServerInstance host, String prefix, String childPrefix,
StringBuilder builder) {
+ _host = host;
+ _prefix = prefix;
+ _childPrefix = childPrefix;
+ _builder = builder;
+ }
+
+ Context next(boolean hasMoreChildren, ServerInstance host) {
+ return new Context(
+ host,
+ hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ",
+ hasMoreChildren ? _childPrefix + "│ " : _childPrefix + " ",
+ _builder
+ );
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index d8770ed3bf..651d259979 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -18,14 +18,11 @@
*/
package org.apache.pinot.query.planner;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.planner.logical.LogicalPlanner;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
/**
@@ -75,53 +72,16 @@ public class QueryPlan {
return _queryResultFields;
}
+ /**
+ * Explains the {@code QueryPlan}
+ *
+ * @return a human-readable tree explaining the query plan
+ * @see ExplainPlanStageVisitor#explain(QueryPlan)
+ * @apiNote this is <b>NOT</b> identical to the SQL {@code EXPLAIN PLAN FOR}
functionality
+ * and is instead intended to be used by developers debugging
during feature
+ * development
+ */
public String explain() {
- if (_queryStageMap.isEmpty()) {
- return "EMPTY";
- }
-
- StringBuilder builder = new StringBuilder();
- explain(
- builder,
- _queryStageMap.get(0),
- "",
- "");
- return builder.toString();
- }
-
- private void explain(
- StringBuilder builder,
- StageNode root,
- String prefix,
- String childPrefix
- ) {
- int stage = root.getStageId();
-
- builder
- .append(prefix)
- .append("[").append(stage).append("] ")
- .append(root.explain());
-
- if (root instanceof TableScanNode) {
- builder.append(' ');
-
builder.append(_stageMetadataMap.get(root.getStageId()).getServerInstanceToSegmentsMap());
- }
-
- builder.append('\n');
-
- if (root instanceof MailboxReceiveNode) {
- int senderStage = ((MailboxReceiveNode) root).getSenderStageId();
- StageNode sender = _queryStageMap.get(senderStage);
- explain(builder, sender, childPrefix + "└── ", childPrefix + " ");
- } else {
- for (Iterator<StageNode> iterator = root.getInputs().iterator();
iterator.hasNext();) {
- StageNode input = iterator.next();
- if (iterator.hasNext()) {
- explain(builder, input, childPrefix + "├── ", childPrefix + "│ ");
- } else {
- explain(builder, input, childPrefix + "└── ", childPrefix + " ");
- }
- }
- }
+ return ExplainPlanStageVisitor.explain(this);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index fe2531f9b6..2f21a64c27 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -97,4 +97,11 @@ public class StageMetadata implements Serializable {
public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
_timeBoundaryInfo = timeBoundaryInfo;
}
+
+ @Override
+ public String toString() {
+ return "StageMetadata{" + "_scannedTables=" + _scannedTables + ",
_serverInstances=" + _serverInstances
+ + ", _serverInstanceToSegmentsMap=" + _serverInstanceToSegmentsMap +
", _timeBoundaryInfo=" + _timeBoundaryInfo
+ + '}';
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
new file mode 100644
index 0000000000..58adfc96d1
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * {@code ShuffleRewriteVisitor} removes unnecessary shuffles from a stage
node plan by
+ * inspecting whether all data required by a specific subtree are already
colocated.
+ * a single host. It gathers the information recursively by checking which
partitioned
+ * data is selected by each node in the tree.
+ *
+ * <p>The only method that should be used externally is {@link
#optimizeShuffles(StageNode)},
+ * other public methods are used only by {@link
StageNode#visit(StageNodeVisitor, Object)}.
+ */
+public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>,
Void> {
+
+ /**
+ * This method rewrites {@code root} <b>in place</b>, removing any
unnecessary shuffles
+ * by replacing the distribution type with {@link
RelDistribution.Type#SINGLETON}.
+ *
+ * @param root the root node of the tree to rewrite
+ */
+ public static void optimizeShuffles(StageNode root) {
+ root.visit(new ShuffleRewriteVisitor(), null);
+ }
+
+ /**
+ * Access to this class should only be used via {@link
#optimizeShuffles(StageNode)}
+ */
+ private ShuffleRewriteVisitor() {
+ }
+
+ @Override
+ public Set<Integer> visitAggregate(AggregateNode node, Void context) {
+ Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this,
context);
+
+ // any input reference directly carries over in group set of aggregation
+ // should still be a partition key
+ Set<Integer> partitionKeys = new HashSet<>();
+ for (int i = 0; i < node.getGroupSet().size(); i++) {
+ RexExpression rex = node.getGroupSet().get(i);
+ if (rex instanceof RexExpression.InputRef) {
+ if (oldPartitionKeys.contains(((RexExpression.InputRef)
rex).getIndex())) {
+ partitionKeys.add(i);
+ }
+ }
+ }
+
+ return partitionKeys;
+ }
+
+ @Override
+ public Set<Integer> visitFilter(FilterNode node, Void context) {
+ // filters don't change partition keys
+ return node.getInputs().get(0).visit(this, context);
+ }
+
+ @Override
+ public Set<Integer> visitJoin(JoinNode node, Void context) {
+ Set<Integer> leftPKs = node.getInputs().get(0).visit(this, context);
+ Set<Integer> rightPks = node.getInputs().get(1).visit(this, context);
+
+ // Currently, JOIN criteria is guaranteed to only have one
FieldSelectionKeySelector
+ FieldSelectionKeySelector leftJoinKey = (FieldSelectionKeySelector)
node.getJoinKeys().getLeftJoinKeySelector();
+ FieldSelectionKeySelector rightJoinKey = (FieldSelectionKeySelector)
node.getJoinKeys().getRightJoinKeySelector();
+
+ int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
+ Set<Integer> partitionKeys = new HashSet<>();
+ for (int i = 0; i < leftJoinKey.getColumnIndices().size(); i++) {
+ int leftIdx = leftJoinKey.getColumnIndices().get(i);
+ int rightIdx = rightJoinKey.getColumnIndices().get(i);
+ if (leftPKs.contains(leftIdx)) {
+ partitionKeys.add(leftIdx);
+ }
+ if (rightPks.contains(rightIdx)) {
+ // combined schema will have all the left fields before the right
fields
+ // so add the leftDataSchemaSize before adding the key
+ partitionKeys.add(leftDataSchemaSize + rightIdx);
+ }
+ }
+
+ return partitionKeys;
+ }
+
+ @Override
+ public Set<Integer> visitMailboxReceive(MailboxReceiveNode node, Void
context) {
+ Set<Integer> oldPartitionKeys = node.getSender().visit(this, context);
+ KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+
+ if (canSkipShuffle(oldPartitionKeys, selector)) {
+ node.setExchangeType(RelDistribution.Type.SINGLETON);
+ return oldPartitionKeys;
+ } else if (selector == null) {
+ return new HashSet<>();
+ } else {
+ return new HashSet<>(((FieldSelectionKeySelector)
selector).getColumnIndices());
+ }
+ }
+
+ @Override
+ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
+ Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this,
context);
+ KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+
+ if (canSkipShuffle(oldPartitionKeys, selector)) {
+ node.setExchangeType(RelDistribution.Type.SINGLETON);
+ return oldPartitionKeys;
+ } else {
+ // reset the context partitionKeys since we've determined that
+ // a shuffle is necessary (the MailboxReceiveNode that reads from
+ // this sender will necessarily be the result of a shuffle and
+ // will reset the partition keys based on its selector)
+ return new HashSet<>();
+ }
+ }
+
+ @Override
+ public Set<Integer> visitProject(ProjectNode node, Void context) {
+ Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this,
context);
+
+ // all inputs carry over if they're still in the projection result
+ Set<Integer> partitionKeys = new HashSet<>();
+ for (int i = 0; i < node.getProjects().size(); i++) {
+ RexExpression rex = node.getProjects().get(i);
+ if (rex instanceof RexExpression.InputRef) {
+ if (oldPartitionKeys.contains(((RexExpression.InputRef)
rex).getIndex())) {
+ partitionKeys.add(i);
+ }
+ }
+ }
+
+ return partitionKeys;
+ }
+
+ @Override
+ public Set<Integer> visitSort(SortNode node, Void context) {
+ // sort doesn't change the partition keys
+ return node.getInputs().get(0).visit(this, context);
+ }
+
+ @Override
+ public Set<Integer> visitTableScan(TableScanNode node, Void context) {
+ // TODO: add table partition in table config as partition keys - this info
is not yet available
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<Integer> visitValue(ValueNode node, Void context) {
+ return new HashSet<>();
+ }
+
+ private static boolean canSkipShuffle(Set<Integer> partitionKeys,
KeySelector<Object[], Object[]> keySelector) {
+ if (!partitionKeys.isEmpty() && keySelector != null) {
+ Set<Integer> targetSet = new HashSet<>(((FieldSelectionKeySelector)
keySelector).getColumnIndices());
+ return targetSet.containsAll(partitionKeys);
+ }
+ return false;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
new file mode 100644
index 0000000000..cf76134c8a
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.HashMap;
+import java.util.List;
+import org.apache.calcite.util.Pair;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * {@code StageMetadataVisitor} computes the {@link StageMetadata} for a
{@link StageNode}
+ * tree and attaches it in the form of a {@link QueryPlan}.
+ */
+public class StageMetadataVisitor implements StageNodeVisitor<Void, QueryPlan>
{
+
+ public static QueryPlan attachMetadata(List<Pair<Integer, String>> fields,
StageNode root) {
+ QueryPlan queryPlan = new QueryPlan(fields, new HashMap<>(), new
HashMap<>());
+ root.visit(new StageMetadataVisitor(), queryPlan);
+ return queryPlan;
+ }
+
+ /**
+ * Usage of this class should only come through {@link #attachMetadata(List,
StageNode)}.
+ */
+ private StageMetadataVisitor() {
+ }
+
+ private void visit(StageNode node, QueryPlan queryPlan) {
+ queryPlan
+ .getStageMetadataMap()
+ .computeIfAbsent(node.getStageId(), (id) -> new StageMetadata())
+ .attach(node);
+ }
+
+ @Override
+ public Void visitAggregate(AggregateNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitFilter(FilterNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitJoin(JoinNode node, QueryPlan context) {
+ node.getInputs().forEach(join -> join.visit(this, context));
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxReceive(MailboxReceiveNode node, QueryPlan context) {
+ node.getSender().visit(this, context);
+ visit(node, context);
+
+ // special case for the global mailbox receive node
+ if (node.getStageId() == 0) {
+ context.getQueryStageMap().put(0, node);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxSend(MailboxSendNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+
+ context.getQueryStageMap().put(node.getStageId(), node);
+ return null;
+ }
+
+ @Override
+ public Void visitProject(ProjectNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitSort(SortNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitTableScan(TableScanNode node, QueryPlan context) {
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitValue(ValueNode node, QueryPlan context) {
+ visit(node, context);
+ return null;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 8c940294ef..323e7f506b 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -18,11 +18,8 @@
*/
package org.apache.pinot.query.planner.logical;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
@@ -32,14 +29,9 @@ import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.routing.WorkerManager;
@@ -51,9 +43,6 @@ import org.apache.pinot.query.routing.WorkerManager;
public class StagePlanner {
private final PlannerContext _plannerContext;
private final WorkerManager _workerManager;
-
- private Map<Integer, StageNode> _queryStageMap;
- private Map<Integer, StageMetadata> _stageMetadataMap;
private int _stageIdCounter;
public StagePlanner(PlannerContext plannerContext, WorkerManager
workerManager) {
@@ -69,176 +58,68 @@ public class StagePlanner {
*/
public QueryPlan makePlan(RelRoot relRoot) {
RelNode relRootNode = relRoot.rel;
- // clear the state
- _queryStageMap = new HashMap<>();
- _stageMetadataMap = new HashMap<>();
// Stage ID starts with 1, 0 will be reserved for ROOT stage.
_stageIdCounter = 1;
// walk the plan and create stages.
StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
+ ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot);
// global root needs to send results back to the ROOT, a.k.a. the client
response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to
SINGLETON by default.
- StageNode globalReceiverNode =
- new MailboxReceiveNode(0, globalStageRoot.getDataSchema(),
globalStageRoot.getStageId(),
- RelDistribution.Type.RANDOM_DISTRIBUTED, null);
StageNode globalSenderNode = new
MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
- globalReceiverNode.getStageId(),
RelDistribution.Type.RANDOM_DISTRIBUTED, null);
+ 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
globalSenderNode.addInput(globalStageRoot);
- _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
- StageMetadata stageMetadata =
_stageMetadataMap.get(globalSenderNode.getStageId());
- stageMetadata.attach(globalSenderNode);
- _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
- StageMetadata globalReceivingStageMetadata = new StageMetadata();
- globalReceivingStageMetadata.attach(globalReceiverNode);
- _stageMetadataMap.put(globalReceiverNode.getStageId(),
globalReceivingStageMetadata);
+ StageNode globalReceiverNode =
+ new MailboxReceiveNode(0, globalStageRoot.getDataSchema(),
globalStageRoot.getStageId(),
+ RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
+
+ QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields,
globalReceiverNode);
// assign workers to each stage.
- for (Map.Entry<Integer, StageMetadata> e : _stageMetadataMap.entrySet()) {
+ for (Map.Entry<Integer, StageMetadata> e :
queryPlan.getStageMetadataMap().entrySet()) {
_workerManager.assignWorkerToStage(e.getKey(), e.getValue());
}
- return new QueryPlan(relRoot.fields, _queryStageMap, _stageMetadataMap);
+ return queryPlan;
}
// non-threadsafe
// TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
- // 1. exchangeNode always have only one input, get its input converted
as a new stage root.
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
RelDistribution distribution = ((LogicalExchange)
node).getDistribution();
- List<Integer> distributionKeys = distribution.getKeys();
- RelDistribution.Type exchangeType = distribution.getType();
-
- // 2. make an exchange sender and receiver node pair
- // only HASH_DISTRIBUTED requires a partition key selector; so all other
types (SINGLETON and BROADCAST)
- // of exchange will not carry a partition key selector.
- KeySelector<Object[], Object[]> keySelector = exchangeType ==
RelDistribution.Type.HASH_DISTRIBUTED
- ? new FieldSelectionKeySelector(distributionKeys) : null;
-
- StageNode mailboxReceiver;
- StageNode mailboxSender;
- if (canSkipShuffle(nextStageRoot, keySelector)) {
- // Use SINGLETON exchange type indicates a LOCAL-to-LOCAL data
transfer between execution threads.
- // TODO: actually implement the SINGLETON exchange without going
through the over-the-wire GRPC mailbox
- // sender and receiver.
- mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), RelDistribution.Type.SINGLETON,
keySelector);
- mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
- mailboxReceiver.getStageId(), RelDistribution.Type.SINGLETON,
keySelector);
- } else {
- mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), exchangeType, keySelector);
- mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
- mailboxReceiver.getStageId(), exchangeType, keySelector);
- }
- mailboxSender.addInput(nextStageRoot);
-
- // 3. put the sender side as a completed stage.
- _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
-
- // 4. update stage metadata.
- updateStageMetadata(mailboxSender.getStageId(), mailboxSender,
_stageMetadataMap);
- updateStageMetadata(mailboxReceiver.getStageId(), mailboxReceiver,
_stageMetadataMap);
-
- // 5. return the receiver, this is considered as a "virtual table scan"
node for its parent.
- return mailboxReceiver;
+ return createSendReceivePair(nextStageRoot, distribution,
currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node,
currentStageId);
List<RelNode> inputs = node.getInputs();
for (RelNode input : inputs) {
stageNode.addInput(walkRelPlan(input, currentStageId));
}
- updateStageMetadata(currentStageId, stageNode, _stageMetadataMap);
return stageNode;
}
}
- private boolean canSkipShuffle(StageNode stageNode, KeySelector<Object[],
Object[]> keySelector) {
- Set<Integer> originSet = stageNode.getPartitionKeys();
- if (!originSet.isEmpty() && keySelector != null) {
- Set<Integer> targetSet = new HashSet<>(((FieldSelectionKeySelector)
keySelector).getColumnIndices());
- return targetSet.containsAll(originSet);
- }
- return false;
- }
- private static void updateStageMetadata(int stageId, StageNode node,
Map<Integer, StageMetadata> stageMetadataMap) {
- updatePartitionKeys(node);
- StageMetadata stageMetadata = stageMetadataMap.computeIfAbsent(stageId,
(id) -> new StageMetadata());
- stageMetadata.attach(node);
- }
+ private StageNode createSendReceivePair(StageNode nextStageRoot,
RelDistribution distribution, int currentStageId) {
+ List<Integer> distributionKeys = distribution.getKeys();
+ RelDistribution.Type exchangeType = distribution.getType();
- private static void updatePartitionKeys(StageNode node) {
- if (node instanceof ProjectNode) {
- // any input reference directly carry over should still be a partition
key.
- Set<Integer> previousPartitionKeys =
node.getInputs().get(0).getPartitionKeys();
- Set<Integer> newPartitionKeys = new HashSet<>();
- ProjectNode projectNode = (ProjectNode) node;
- for (int i = 0; i < projectNode.getProjects().size(); i++) {
- RexExpression rexExpression = projectNode.getProjects().get(i);
- if (rexExpression instanceof RexExpression.InputRef
- && previousPartitionKeys.contains(((RexExpression.InputRef)
rexExpression).getIndex())) {
- newPartitionKeys.add(i);
- }
- }
- projectNode.setPartitionKeys(newPartitionKeys);
- } else if (node instanceof FilterNode) {
- // filter node doesn't change partition keys.
- node.setPartitionKeys(node.getInputs().get(0).getPartitionKeys());
- } else if (node instanceof AggregateNode) {
- // any input reference directly carry over in group set of aggregation
should still be a partition key.
- Set<Integer> previousPartitionKeys =
node.getInputs().get(0).getPartitionKeys();
- Set<Integer> newPartitionKeys = new HashSet<>();
- AggregateNode aggregateNode = (AggregateNode) node;
- for (int i = 0; i < aggregateNode.getGroupSet().size(); i++) {
- RexExpression rexExpression = aggregateNode.getGroupSet().get(i);
- if (rexExpression instanceof RexExpression.InputRef
- && previousPartitionKeys.contains(((RexExpression.InputRef)
rexExpression).getIndex())) {
- newPartitionKeys.add(i);
- }
- }
- aggregateNode.setPartitionKeys(newPartitionKeys);
- } else if (node instanceof JoinNode) {
- int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
- Set<Integer> leftPartitionKeys =
node.getInputs().get(0).getPartitionKeys();
- Set<Integer> rightPartitionKeys =
node.getInputs().get(1).getPartitionKeys();
- // Currently, JOIN criteria guarantee to only have one
FieldSelectionKeySelector.
- FieldSelectionKeySelector leftJoinKeySelector =
- (FieldSelectionKeySelector) ((JoinNode)
node).getJoinKeys().getLeftJoinKeySelector();
- FieldSelectionKeySelector rightJoinKeySelector =
- (FieldSelectionKeySelector) ((JoinNode)
node).getJoinKeys().getRightJoinKeySelector();
- Set<Integer> newPartitionKeys = new HashSet<>();
- for (int i = 0; i < leftJoinKeySelector.getColumnIndices().size(); i++) {
- int leftIndex = leftJoinKeySelector.getColumnIndices().get(i);
- int rightIndex = rightJoinKeySelector.getColumnIndices().get(i);
- if (leftPartitionKeys.contains(leftIndex)) {
- newPartitionKeys.add(leftIndex);
- }
- if (rightPartitionKeys.contains(rightIndex)) {
- newPartitionKeys.add(leftDataSchemaSize + rightIndex);
- }
- }
- node.setPartitionKeys(newPartitionKeys);
- } else if (node instanceof TableScanNode) {
- // TODO: add table partition in table config as partition keys. we dont
have that information yet.
- } else if (node instanceof MailboxReceiveNode) {
- // hash distribution key is partition key.
- FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
- ((MailboxReceiveNode) node).getPartitionKeySelector();
- if (keySelector != null) {
- node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
- }
- } else if (node instanceof MailboxSendNode) {
- FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
- ((MailboxSendNode) node).getPartitionKeySelector();
- if (keySelector != null) {
- node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
- }
- }
+ // make an exchange sender and receiver node pair
+ // only HASH_DISTRIBUTED requires a partition key selector; so all other
types (SINGLETON and BROADCAST)
+ // of exchange will not carry a partition key selector.
+ KeySelector<Object[], Object[]> keySelector = exchangeType ==
RelDistribution.Type.HASH_DISTRIBUTED
+ ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+ StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
+ currentStageId, exchangeType, keySelector);
+ StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
+ nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
+ mailboxSender.addInput(nextStageRoot);
+
+ return mailboxReceiver;
}
private boolean isExchangeNode(RelNode node) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
index 594c6d7e38..46de8731b7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
@@ -19,10 +19,7 @@
package org.apache.pinot.query.planner.stage;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.serde.ProtoSerializable;
@@ -34,7 +31,6 @@ public abstract class AbstractStageNode implements StageNode,
ProtoSerializable
protected final int _stageId;
protected final List<StageNode> _inputs;
protected DataSchema _dataSchema;
- protected Set<Integer> _partitionedKeys;
public AbstractStageNode(int stageId) {
this(stageId, null);
@@ -44,7 +40,6 @@ public abstract class AbstractStageNode implements StageNode,
ProtoSerializable
_stageId = stageId;
_dataSchema = dataSchema;
_inputs = new ArrayList<>();
- _partitionedKeys = new HashSet<>();
}
@Override
@@ -72,16 +67,6 @@ public abstract class AbstractStageNode implements
StageNode, ProtoSerializable
_dataSchema = dataSchema;
}
- @Override
- public Set<Integer> getPartitionKeys() {
- return _partitionedKeys;
- }
-
- @Override
- public void setPartitionKeys(Collection<Integer> partitionedKeys) {
- _partitionedKeys.addAll(partitionedKeys);
- }
-
@Override
public void fromObjectField(Plan.ObjectField objectField) {
ProtoSerializationUtils.setObjectFieldToObject(this, objectField);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
index 70174df265..ea8dc2c1c1 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -59,4 +59,9 @@ public class AggregateNode extends AbstractStageNode {
public String explain() {
return "AGGREGATE";
}
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitAggregate(this, context);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
index 0d960e951a..52ed004da1 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
@@ -45,4 +45,9 @@ public class FilterNode extends AbstractStageNode {
public String explain() {
return "FILTER";
}
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitFilter(this, context);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
index b34d74d574..af9b4e03ed 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
@@ -64,6 +64,11 @@ public class JoinNode extends AbstractStageNode {
return "JOIN";
}
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitJoin(this, context);
+ }
+
public static class JoinKeys {
@ProtoProperties
private KeySelector<Object[], Object[]> _leftJoinKeySelector;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index e358c2cb13..cf90a0005c 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -33,22 +33,32 @@ public class MailboxReceiveNode extends AbstractStageNode {
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
+ // this is only available during planning and should not be relied
+ // on in any post-serialization code
+ private transient StageNode _sender;
+
public MailboxReceiveNode(int stageId) {
super(stageId);
}
public MailboxReceiveNode(int stageId, DataSchema dataSchema, int
senderStageId,
- RelDistribution.Type exchangeType, @Nullable KeySelector<Object[],
Object[]> partitionKeySelector) {
+ RelDistribution.Type exchangeType, @Nullable KeySelector<Object[],
Object[]> partitionKeySelector,
+ StageNode sender) {
super(stageId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
+ _sender = sender;
}
public int getSenderStageId() {
return _senderStageId;
}
+ public void setExchangeType(RelDistribution.Type exchangeType) {
+ _exchangeType = exchangeType;
+ }
+
public RelDistribution.Type getExchangeType() {
return _exchangeType;
}
@@ -57,8 +67,17 @@ public class MailboxReceiveNode extends AbstractStageNode {
return _partitionKeySelector;
}
+ public StageNode getSender() {
+ return _sender;
+ }
+
@Override
public String explain() {
- return "MAIL_RECEIVE";
+ return "MAIL_RECEIVE(" + _exchangeType + ")";
+ }
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitMailboxReceive(this, context);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 05459b5634..4219590100 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -49,6 +49,10 @@ public class MailboxSendNode extends AbstractStageNode {
return _receiverStageId;
}
+ public void setExchangeType(RelDistribution.Type exchangeType) {
+ _exchangeType = exchangeType;
+ }
+
public RelDistribution.Type getExchangeType() {
return _exchangeType;
}
@@ -59,6 +63,11 @@ public class MailboxSendNode extends AbstractStageNode {
@Override
public String explain() {
- return "MAIL_SEND";
+ return "MAIL_SEND(" + _exchangeType + ")";
+ }
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitMailboxSend(this, context);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
index 250a38885f..8371dda609 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
@@ -46,4 +46,9 @@ public class ProjectNode extends AbstractStageNode {
public String explain() {
return "PROJECT";
}
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitProject(this, context);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
index 4df23e7325..38b2da6c56 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -72,4 +72,9 @@ public class SortNode extends AbstractStageNode {
public String explain() {
return "SORT" + (_fetch > 0 ? " (LIMIT " + _fetch + ")" : "");
}
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitSort(this, context);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
index 8f170a065c..7e3278cfe8 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
@@ -19,9 +19,7 @@
package org.apache.pinot.query.planner.stage;
import java.io.Serializable;
-import java.util.Collection;
import java.util.List;
-import java.util.Set;
import org.apache.pinot.common.utils.DataSchema;
@@ -45,9 +43,7 @@ public interface StageNode extends Serializable {
void setDataSchema(DataSchema dataSchema);
- Set<Integer> getPartitionKeys();
-
- void setPartitionKeys(Collection<Integer> partitionKeys);
-
String explain();
+
+ <T, C> T visit(StageNodeVisitor<T, C> visitor, C context);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
new file mode 100644
index 0000000000..614dbb877a
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import org.apache.pinot.query.planner.QueryPlan;
+
+
+/**
+ * {@code StageNodeVisitor} is a skeleton class that allows for
implementations of {@code StageNode}
+ * tree traversals using the {@link StageNode#visit(StageNodeVisitor, Object)}
method. There is no
+ * enforced traversal order, and should be implemented by subclasses.
+ *
+ * <p>It is recommended that implementors use private constructors and static
methods to access main
+ * functionality (see {@link
org.apache.pinot.query.planner.ExplainPlanStageVisitor#explain(QueryPlan)}
+ * as an example of a usage of this pattern.
+ *
+ * @param <T> the return type for all visits
+ * @param <C> a Context that will be passed as the second parameter to {@code
StageNode#visit},
+ * implementors can decide how they want to use this context (e.g.
whether or not
+ * it can be modified in place or whether it's an immutable context)
+ */
+public interface StageNodeVisitor<T, C> {
+
+ T visitAggregate(AggregateNode node, C context);
+
+ T visitFilter(FilterNode node, C context);
+
+ T visitJoin(JoinNode node, C context);
+
+ T visitMailboxReceive(MailboxReceiveNode node, C context);
+
+ T visitMailboxSend(MailboxSendNode node, C context);
+
+ T visitProject(ProjectNode node, C context);
+
+ T visitSort(SortNode node, C context);
+
+ T visitTableScan(TableScanNode node, C context);
+
+ T visitValue(ValueNode node, C context);
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
index bcd8493cc7..7711dd6235 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
@@ -51,4 +51,9 @@ public class TableScanNode extends AbstractStageNode {
public String explain() {
return "TABLE SCAN (" + _tableName + ")";
}
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitTableScan(this, context);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
index 3918338d19..b3ad0d40f6 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
@@ -56,4 +56,9 @@ public class ValueNode extends AbstractStageNode {
public String explain() {
return "LITERAL";
}
+
+ @Override
+ public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ return visitor.visitValue(this, context);
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index c4d81fd76a..bdc36a7571 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -99,4 +99,9 @@ public class GrpcMailboxService implements
MailboxService<MailboxContent> {
public ManagedChannel getChannel(String mailboxId) {
return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
}
+
+ @Override
+ public String toString() {
+ return "GrpcMailboxService{" + "_hostname='" + _hostname + '\'' + ",
_mailboxPort=" + _mailboxPort + '}';
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 9b457644d0..46dd9dc967 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -123,7 +123,7 @@ public class TransferableBlock implements Block {
throw new UnsupportedOperationException("Unable to build from
container with type: " + _type);
}
} catch (Exception e) {
- throw new RuntimeException("Unable to create DataBlock");
+ throw new RuntimeException("Unable to create DataBlock", e);
}
}
return _dataBlock;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
new file mode 100644
index 0000000000..b4cc73d5ac
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
+import org.apache.pinot.query.runtime.operator.FilterOperator;
+import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+
+
+/**
+ * This visitor constructs a physical plan of operators from a {@link
StageNode} tree. Note that
+ * this works only for the intermediate stage nodes, leaf stage nodes are
expected to compile into
+ * v1 operators at this point in time.
+ *
+ * <p>This class should be used statically via {@link #build(MailboxService,
String, int, long, Map, StageNode)}
+ *
+ * @see
org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan,
ExecutorService, Map)
+ */
+public class PhysicalPlanVisitor implements
StageNodeVisitor<Operator<TransferableBlock>, Void> {
+
+ private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+ private final String _hostName;
+ private final int _port;
+ private final long _requestId;
+ private final Map<Integer, StageMetadata> _metadataMap;
+
+ public static Operator<TransferableBlock>
build(MailboxService<Mailbox.MailboxContent> mailboxService,
+ String hostName, int port, long requestId, Map<Integer, StageMetadata>
metadataMap, StageNode node) {
+ return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port,
requestId, metadataMap), null);
+ }
+
+ private PhysicalPlanVisitor(MailboxService<Mailbox.MailboxContent>
mailboxService, String hostName, int port,
+ long requestId, Map<Integer, StageMetadata> metadataMap) {
+ _mailboxService = mailboxService;
+ _hostName = hostName;
+ _port = port;
+ _requestId = requestId;
+ _metadataMap = metadataMap;
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode
node, Void context) {
+ List<ServerInstance> sendingInstances =
_metadataMap.get(node.getSenderStageId()).getServerInstances();
+ return new MailboxReceiveOperator(_mailboxService, node.getDataSchema(),
sendingInstances,
+ node.getExchangeType(), node.getPartitionKeySelector(), _hostName,
_port, _requestId,
+ node.getSenderStageId());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node,
Void context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ StageMetadata receivingStageMetadata =
_metadataMap.get(node.getReceiverStageId());
+ return new MailboxSendOperator(_mailboxService, node.getDataSchema(),
nextOperator,
+ receivingStageMetadata.getServerInstances(), node.getExchangeType(),
node.getPartitionKeySelector(),
+ _hostName, _port, _requestId, node.getStageId());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitAggregate(AggregateNode node, Void
context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ return new AggregateOperator(nextOperator, node.getDataSchema(),
node.getAggCalls(),
+ node.getGroupSet(), node.getInputs().get(0).getDataSchema());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitFilter(FilterNode node, Void
context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ return new FilterOperator(nextOperator, node.getDataSchema(),
node.getCondition());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitJoin(JoinNode node, Void context) {
+ StageNode left = node.getInputs().get(0);
+ StageNode right = node.getInputs().get(1);
+
+ Operator<TransferableBlock> leftOperator = left.visit(this, null);
+ Operator<TransferableBlock> rightOperator = right.visit(this, null);
+
+ return new HashJoinOperator(leftOperator, left.getDataSchema(),
rightOperator,
+ right.getDataSchema(), node.getDataSchema(), node.getJoinKeys(),
+ node.getJoinClauses(), node.getJoinRelType());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitProject(ProjectNode node, Void
context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ return new TransformOperator(nextOperator, node.getDataSchema(),
node.getProjects(),
+ node.getInputs().get(0).getDataSchema());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitSort(SortNode node, Void context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ return new SortOperator(nextOperator, node.getCollationKeys(),
node.getCollationDirections(),
+ node.getFetch(), node.getOffset(), node.getDataSchema());
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitTableScan(TableScanNode node, Void
context) {
+ throw new UnsupportedOperationException("Stage node of type TableScanNode
is not supported!");
+ }
+
+ @Override
+ public Operator<TransferableBlock> visitValue(ValueNode node, Void context) {
+ return new LiteralValueOperator(node.getDataSchema(),
node.getLiteralRows());
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 5f0ed67d08..5fb7d05d69 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -18,36 +18,17 @@
*/
package org.apache.pinot.query.runtime.executor;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.request.context.ThreadTimer;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.StageMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.AggregateOperator;
-import org.apache.pinot.query.runtime.operator.FilterOperator;
-import org.apache.pinot.query.runtime.operator.HashJoinOperator;
-import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
-import org.apache.pinot.query.runtime.operator.SortOperator;
-import org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
@@ -77,6 +58,7 @@ public class WorkerQueryExecutor {
_port = port;
}
+
public synchronized void start() {
LOGGER.info("Worker query executor started");
}
@@ -85,12 +67,14 @@ public class WorkerQueryExecutor {
LOGGER.info("Worker query executor shut down");
}
- // TODO: split this execution from PhysicalPlanner
public void processQuery(DistributedStagePlan queryRequest, Map<String,
String> requestMetadataMap,
ExecutorService executorService) {
long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
StageNode stageRoot = queryRequest.getStageRoot();
- BaseOperator<TransferableBlock> rootOperator = getOperator(requestId,
stageRoot, queryRequest.getMetadataMap());
+
+ Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build(
+ _mailboxService, _hostName, _port, requestId,
queryRequest.getMetadataMap(), stageRoot);
+
executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
@@ -102,55 +86,4 @@ public class WorkerQueryExecutor {
}
});
}
-
- // TODO: split this PhysicalPlanner into a separate module
- // TODO: optimize this into a framework. (physical planner)
- private BaseOperator<TransferableBlock> getOperator(long requestId,
StageNode stageNode,
- Map<Integer, StageMetadata> metadataMap) {
- if (stageNode instanceof MailboxReceiveNode) {
- MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
- List<ServerInstance> sendingInstances =
metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
- return new MailboxReceiveOperator(_mailboxService,
receiveNode.getDataSchema(), sendingInstances,
- receiveNode.getExchangeType(),
receiveNode.getPartitionKeySelector(), _hostName, _port, requestId,
- receiveNode.getSenderStageId());
- } else if (stageNode instanceof MailboxSendNode) {
- MailboxSendNode sendNode = (MailboxSendNode) stageNode;
- BaseOperator<TransferableBlock> nextOperator = getOperator(requestId,
sendNode.getInputs().get(0), metadataMap);
- StageMetadata receivingStageMetadata =
metadataMap.get(sendNode.getReceiverStageId());
- return new MailboxSendOperator(_mailboxService,
sendNode.getDataSchema(), nextOperator,
- receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
- _hostName, _port, requestId, sendNode.getStageId());
- } else if (stageNode instanceof JoinNode) {
- JoinNode joinNode = (JoinNode) stageNode;
- BaseOperator<TransferableBlock> leftOperator = getOperator(requestId,
joinNode.getInputs().get(0), metadataMap);
- BaseOperator<TransferableBlock> rightOperator = getOperator(requestId,
joinNode.getInputs().get(1), metadataMap);
- return new HashJoinOperator(leftOperator,
joinNode.getInputs().get(0).getDataSchema(), rightOperator,
- joinNode.getInputs().get(1).getDataSchema(),
joinNode.getDataSchema(), joinNode.getJoinKeys(),
- joinNode.getJoinClauses(), joinNode.getJoinRelType());
- } else if (stageNode instanceof AggregateNode) {
- AggregateNode aggregateNode = (AggregateNode) stageNode;
- BaseOperator<TransferableBlock> inputOperator =
- getOperator(requestId, aggregateNode.getInputs().get(0),
metadataMap);
- return new AggregateOperator(inputOperator,
aggregateNode.getDataSchema(), aggregateNode.getAggCalls(),
- aggregateNode.getGroupSet(),
aggregateNode.getInputs().get(0).getDataSchema());
- } else if (stageNode instanceof FilterNode) {
- FilterNode filterNode = (FilterNode) stageNode;
- return new FilterOperator(getOperator(requestId,
filterNode.getInputs().get(0), metadataMap),
- filterNode.getDataSchema(), filterNode.getCondition());
- } else if (stageNode instanceof ProjectNode) {
- ProjectNode projectNode = (ProjectNode) stageNode;
- return new TransformOperator(getOperator(requestId,
projectNode.getInputs().get(0), metadataMap),
- projectNode.getDataSchema(), projectNode.getProjects(),
projectNode.getInputs().get(0).getDataSchema());
- } else if (stageNode instanceof SortNode) {
- SortNode sortNode = (SortNode) stageNode;
- return new SortOperator(getOperator(requestId,
sortNode.getInputs().get(0), metadataMap),
- sortNode.getCollationKeys(), sortNode.getCollationDirections(),
sortNode.getFetch(), sortNode.getOffset(),
- sortNode.getDataSchema());
- } else if (stageNode instanceof ValueNode) {
- return new LiteralValueOperator(stageNode.getDataSchema(), ((ValueNode)
stageNode).getLiteralRows());
- } else {
- throw new UnsupportedOperationException(
- String.format("Stage node type %s is not supported!",
stageNode.getClass().getSimpleName()));
- }
- }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 6747b07ef4..0c261d26e3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -48,7 +48,7 @@ import org.apache.pinot.spi.data.FieldSpec;
public class AggregateOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
- private BaseOperator<TransferableBlock> _inputOperator;
+ private Operator<TransferableBlock> _inputOperator;
private List<RexExpression> _aggCalls;
private List<RexExpression> _groupSet;
@@ -64,7 +64,7 @@ public class AggregateOperator extends
BaseOperator<TransferableBlock> {
private boolean _isCumulativeBlockConstructed;
// TODO: refactor Pinot Reducer code to support the intermediate stage agg
operator.
- public AggregateOperator(BaseOperator<TransferableBlock> inputOperator,
DataSchema dataSchema,
+ public AggregateOperator(Operator<TransferableBlock> inputOperator,
DataSchema dataSchema,
List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema
upstreamDataSchema) {
_inputOperator = inputOperator;
_aggCalls = aggCalls;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index f1ab55061e..b3aa17ac56 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -34,12 +34,12 @@ import
org.apache.pinot.query.runtime.operator.operands.FilterOperand;
public class FilterOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "FILTER";
- private final BaseOperator<TransferableBlock> _upstreamOperator;
+ private final Operator<TransferableBlock> _upstreamOperator;
private final FilterOperand _filterOperand;
private final DataSchema _dataSchema;
private TransferableBlock _upstreamErrorBlock;
- public FilterOperator(BaseOperator<TransferableBlock> upstreamOperator,
DataSchema dataSchema, RexExpression filter) {
+ public FilterOperator(Operator<TransferableBlock> upstreamOperator,
DataSchema dataSchema, RexExpression filter) {
_upstreamOperator = upstreamOperator;
_dataSchema = dataSchema;
_filterOperand = FilterOperand.toFilterOperand(filter, dataSchema);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index cb43ff09c0..bcf6807dd9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -49,8 +49,8 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
- private final BaseOperator<TransferableBlock> _leftTableOperator;
- private final BaseOperator<TransferableBlock> _rightTableOperator;
+ private final Operator<TransferableBlock> _leftTableOperator;
+ private final Operator<TransferableBlock> _rightTableOperator;
private final JoinRelType _joinType;
private final DataSchema _resultSchema;
private final DataSchema _leftTableSchema;
@@ -62,8 +62,8 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
private KeySelector<Object[], Object[]> _leftKeySelector;
private KeySelector<Object[], Object[]> _rightKeySelector;
- public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
DataSchema leftSchema,
- BaseOperator<TransferableBlock> rightTableOperator, DataSchema
rightSchema, DataSchema outputSchema,
+ public HashJoinOperator(Operator<TransferableBlock> leftTableOperator,
DataSchema leftSchema,
+ Operator<TransferableBlock> rightTableOperator, DataSchema rightSchema,
DataSchema outputSchema,
JoinNode.JoinKeys joinKeys, List<RexExpression> joinClauses, JoinRelType
joinType) {
_leftKeySelector = joinKeys.getLeftJoinKeySelector();
_rightKeySelector = joinKeys.getRightJoinKeySelector();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad05f207ef..8169fc9121 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -79,6 +79,13 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
singletonInstance = serverInstance;
}
}
+
+ // FIXME: there's a bug where singletonInstance may be null in the case
of a JOIN where
+ // one side is BROADCAST and the other is SINGLETON (this is the case
with nested loop
+ // joins for inequality conditions). This causes NPEs in the logs, but
actually works
+ // because the side that hits the NPE doesn't expect to get any data
anyway (that's the
+ // side that gets the broadcast from one side but nothing from the
SINGLETON)
+ // FIXME: https://github.com/apache/pinot/issues/9592
_sendingStageInstances = Collections.singletonList(singletonInstance);
} else {
_sendingStageInstances = sendingStageInstances;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index ae4cab02f2..3e358ccc2c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -70,10 +70,10 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
private final int _stageId;
private final MailboxService<Mailbox.MailboxContent> _mailboxService;
private final DataSchema _dataSchema;
- private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
+ private Operator<TransferableBlock> _dataTableBlockBaseOperator;
public MailboxSendOperator(MailboxService<Mailbox.MailboxContent>
mailboxService, DataSchema dataSchema,
- BaseOperator<TransferableBlock> dataTableBlockBaseOperator,
List<ServerInstance> receivingStageInstances,
+ Operator<TransferableBlock> dataTableBlockBaseOperator,
List<ServerInstance> receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector, String hostName, int port,
long jobId, int stageId) {
_dataSchema = dataSchema;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index b3741cb28f..1acb0c9a69 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -37,7 +37,7 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
public class SortOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "SORT";
- private final BaseOperator<TransferableBlock> _upstreamOperator;
+ private final Operator<TransferableBlock> _upstreamOperator;
private final int _fetch;
private final int _offset;
private final DataSchema _dataSchema;
@@ -47,7 +47,7 @@ public class SortOperator extends
BaseOperator<TransferableBlock> {
private boolean _isSortedBlockConstructed;
private TransferableBlock _upstreamErrorBlock;
- public SortOperator(BaseOperator<TransferableBlock> upstreamOperator,
List<RexExpression> collationKeys,
+ public SortOperator(Operator<TransferableBlock> upstreamOperator,
List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int
offset, DataSchema dataSchema) {
_upstreamOperator = upstreamOperator;
_fetch = fetch;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 0c947e1ca6..90efc377ab 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -37,13 +37,13 @@ import
org.apache.pinot.query.runtime.operator.operands.TransformOperand;
*/
public class TransformOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "TRANSFORM";
- private final BaseOperator<TransferableBlock> _upstreamOperator;
+ private final Operator<TransferableBlock> _upstreamOperator;
private final List<TransformOperand> _transformOperandsList;
private final int _resultColumnSize;
private final DataSchema _resultSchema;
private TransferableBlock _upstreamErrorBlock;
- public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator,
DataSchema dataSchema,
+ public TransformOperator(Operator<TransferableBlock> upstreamOperator,
DataSchema dataSchema,
List<RexExpression> transforms, DataSchema upstreamDataSchema) {
_upstreamOperator = upstreamOperator;
_resultColumnSize = transforms.size();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]