This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 66bce7132c Short circuit SubPlanFragmenter because we don't support
multiple sub-plans yet (#13306)
66bce7132c is described below
commit 66bce7132cbb90673405571f27c194cb39b53531
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 4 09:57:23 2024 -0700
Short circuit SubPlanFragmenter because we don't support multiple sub-plans
yet (#13306)
---
.../pinot/query/planner/QueryPlanMetadata.java | 61 -----------
.../planner/logical/PinotLogicalQueryPlanner.java | 119 ++++++++++-----------
.../planner/logical/RelToPlanNodeConverter.java | 2 +-
.../query/planner/logical/SubPlanFragmenter.java | 6 +-
4 files changed, 63 insertions(+), 125 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlanMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlanMetadata.java
deleted file mode 100644
index 863e7c378f..0000000000
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlanMetadata.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.apache.calcite.runtime.PairList;
-
-
-/**
- * QueryPlanMetadata contains the metadata of the {@code QueryPlan}.
- * It contains the table names and the fields of the query result.
- */
-public class QueryPlanMetadata {
- private final Set<String> _tableNames;
- private final PairList<Integer, String> _fields;
- private final Map<String, String> _customProperties;
-
- public QueryPlanMetadata(Set<String> tableNames, PairList<Integer, String>
fields) {
- _tableNames = tableNames;
- _fields = fields;
- _customProperties = new HashMap<>();
- }
-
- public Map<String, String> getCustomProperties() {
- return _customProperties;
- }
-
- /**
- * Get the table names.
- * @return table names.
- */
- public Set<String> getTableNames() {
- return _tableNames;
- }
-
- /**
- * Get the query result field.
- * @return query result field.
- */
- public PairList<Integer, String> getFields() {
- return _fields;
- }
-}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 486b78da94..9e7703b7ef 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -24,15 +24,12 @@ import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
-import org.apache.pinot.query.planner.QueryPlanMetadata;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.SubPlanMetadata;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
@@ -51,70 +48,72 @@ public class PinotLogicalQueryPlanner {
* Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}.
*/
public static SubPlan makePlan(RelRoot relRoot) {
- PlanNode rootNode = relNodeToStageNode(relRoot.rel);
- QueryPlanMetadata metadata =
- new
QueryPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel),
relRoot.fields);
+ PlanNode rootNode = relNodeToPlanNode(relRoot.rel);
+ PlanFragment rootFragment = planNodeToPlanFragment(rootNode);
+ return new SubPlan(rootFragment,
+ new
SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel),
relRoot.fields), List.of());
+ // TODO: Currently we don't support multiple sub-plans. Revisit the
following logic when we add the support.
// Fragment the stage tree into multiple SubPlans.
- SubPlanFragmenter.Context subPlanContext = new SubPlanFragmenter.Context();
- subPlanContext._subPlanIdToRootNodeMap.put(0, rootNode);
- subPlanContext._subPlanIdToMetadataMap.put(0, new
SubPlanMetadata(metadata.getTableNames(), metadata.getFields()));
- rootNode.visit(SubPlanFragmenter.INSTANCE, subPlanContext);
-
- Map<Integer, SubPlan> subPlanMap = new HashMap<>();
- for (Map.Entry<Integer, PlanNode> subPlanEntry :
subPlanContext._subPlanIdToRootNodeMap.entrySet()) {
- int subPlanId = subPlanEntry.getKey();
- PlanNode subPlanRoot = subPlanEntry.getValue();
-
- // Fragment the SubPlan into multiple PlanFragments.
- PlanFragmenter fragmenter = new PlanFragmenter();
- PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
- subPlanRoot = subPlanRoot.visit(fragmenter, fragmenterContext);
- Int2ObjectOpenHashMap<PlanFragment> planFragmentMap =
fragmenter.getPlanFragmentMap();
- Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap =
fragmenter.getChildPlanFragmentIdsMap();
-
- // Sub plan root needs to send final results back to the Broker
- // TODO: Should be SINGLETON (currently SINGLETON has to be local, so
use BROADCAST_DISTRIBUTED instead)
- MailboxSendNode subPlanRootSenderNode =
- new MailboxSendNode(subPlanRoot.getPlanFragmentId(),
subPlanRoot.getDataSchema(), 0,
- RelDistribution.Type.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.getDefaultExchangeType(), null, null,
- false, false);
- subPlanRootSenderNode.addInput(subPlanRoot);
- PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode,
new ArrayList<>());
- planFragmentMap.put(1, planFragment1);
- for (Int2ObjectMap.Entry<IntList> entry :
childPlanFragmentIdsMap.int2ObjectEntrySet()) {
- PlanFragment planFragment = planFragmentMap.get(entry.getIntKey());
- List<PlanFragment> childPlanFragments = planFragment.getChildren();
- IntListIterator childPlanFragmentIdIterator =
entry.getValue().iterator();
- while (childPlanFragmentIdIterator.hasNext()) {
-
childPlanFragments.add(planFragmentMap.get(childPlanFragmentIdIterator.nextInt()));
- }
- }
- MailboxReceiveNode rootReceiveNode =
- new MailboxReceiveNode(0, subPlanRoot.getDataSchema(),
subPlanRoot.getPlanFragmentId(),
- RelDistribution.Type.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.getDefaultExchangeType(), null, null,
- false, false, subPlanRootSenderNode);
- PlanFragment rootPlanFragment = new PlanFragment(0, rootReceiveNode,
Collections.singletonList(planFragment1));
- SubPlan subPlan = new SubPlan(rootPlanFragment,
subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
- subPlanMap.put(subPlanId, subPlan);
- }
- for (Map.Entry<Integer, List<Integer>> subPlanToChildrenEntry :
subPlanContext._subPlanIdToChildrenMap.entrySet()) {
- int subPlanId = subPlanToChildrenEntry.getKey();
- List<Integer> subPlanChildren = subPlanToChildrenEntry.getValue();
- for (int subPlanChild : subPlanChildren) {
-
subPlanMap.get(subPlanId).getChildren().add(subPlanMap.get(subPlanChild));
- }
- }
- return subPlanMap.get(0);
+// SubPlanFragmenter.Context subPlanContext = new
SubPlanFragmenter.Context();
+// subPlanContext._subPlanIdToRootNodeMap.put(0, rootNode);
+// subPlanContext._subPlanIdToMetadataMap.put(0,
+// new
SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel),
relRoot.fields));
+// rootNode.visit(SubPlanFragmenter.INSTANCE, subPlanContext);
+//
+// Map<Integer, SubPlan> subPlanMap = new HashMap<>();
+// for (Map.Entry<Integer, PlanNode> subPlanEntry :
subPlanContext._subPlanIdToRootNodeMap.entrySet()) {
+// SubPlan subPlan =
+// new SubPlan(planNodeToPlanFragment(subPlanEntry.getValue()),
subPlanContext._subPlanIdToMetadataMap.get(0),
+// new ArrayList<>());
+// subPlanMap.put(subPlanEntry.getKey(), subPlan);
+// }
+// for (Map.Entry<Integer, List<Integer>> subPlanToChildrenEntry :
subPlanContext._subPlanIdToChildrenMap.entrySet
+// ()) {
+// int subPlanId = subPlanToChildrenEntry.getKey();
+// List<Integer> subPlanChildren = subPlanToChildrenEntry.getValue();
+// for (int subPlanChild : subPlanChildren) {
+//
subPlanMap.get(subPlanId).getChildren().add(subPlanMap.get(subPlanChild));
+// }
+// }
+// return subPlanMap.get(0);
}
- // TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
- private static PlanNode relNodeToStageNode(RelNode node) {
- PlanNode planNode = RelToPlanNodeConverter.toStageNode(node, -1);
+ private static PlanNode relNodeToPlanNode(RelNode node) {
+ PlanNode planNode = RelToPlanNodeConverter.toPlanNode(node, -1);
List<RelNode> inputs = node.getInputs();
for (RelNode input : inputs) {
- planNode.addInput(relNodeToStageNode(input));
+ planNode.addInput(relNodeToPlanNode(input));
}
return planNode;
}
+
+ private static PlanFragment planNodeToPlanFragment(PlanNode node) {
+ PlanFragmenter fragmenter = new PlanFragmenter();
+ PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
+ node = node.visit(fragmenter, fragmenterContext);
+ Int2ObjectOpenHashMap<PlanFragment> planFragmentMap =
fragmenter.getPlanFragmentMap();
+ Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap =
fragmenter.getChildPlanFragmentIdsMap();
+
+ // Sub plan root needs to send final results back to the Broker
+ // TODO: Should be SINGLETON (currently SINGLETON has to be local, so use
BROADCAST_DISTRIBUTED instead)
+ MailboxSendNode subPlanRootSenderNode = new
MailboxSendNode(node.getPlanFragmentId(), node.getDataSchema(), 0,
+ RelDistribution.Type.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.getDefaultExchangeType(), null, null, false,
+ false);
+ subPlanRootSenderNode.addInput(node);
+ PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode,
new ArrayList<>());
+ planFragmentMap.put(1, planFragment1);
+ for (Int2ObjectMap.Entry<IntList> entry :
childPlanFragmentIdsMap.int2ObjectEntrySet()) {
+ PlanFragment planFragment = planFragmentMap.get(entry.getIntKey());
+ List<PlanFragment> childPlanFragments = planFragment.getChildren();
+ IntListIterator childPlanFragmentIdIterator =
entry.getValue().iterator();
+ while (childPlanFragmentIdIterator.hasNext()) {
+
childPlanFragments.add(planFragmentMap.get(childPlanFragmentIdIterator.nextInt()));
+ }
+ }
+ MailboxReceiveNode rootReceiveNode = new MailboxReceiveNode(0,
node.getDataSchema(), node.getPlanFragmentId(),
+ RelDistribution.Type.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.getDefaultExchangeType(), null, null, false,
+ false, subPlanRootSenderNode);
+ return new PlanFragment(0, rootReceiveNode,
Collections.singletonList(planFragment1));
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index e0a0dfd3a1..c253402c5e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -85,7 +85,7 @@ public final class RelToPlanNodeConverter {
* @param node relational node
* @return stage node.
*/
- public static PlanNode toStageNode(RelNode node, int currentStageId) {
+ public static PlanNode toPlanNode(RelNode node, int currentStageId) {
if (node instanceof LogicalTableScan) {
return convertLogicalTableScan((LogicalTableScan) node, currentStageId);
} else if (node instanceof LogicalJoin) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
index 989b52e971..6d17fbd694 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
@@ -42,13 +42,13 @@ import org.apache.pinot.query.planner.plannode.WindowNode;
/**
- * SubPlanFragmenter is an implementation of {@link PlanNodeVisitor} to
fragment a
- * {@link org.apache.pinot.query.planner.QueryPlan} into multiple {@link
org.apache.pinot.query.planner.SubPlan}.
+ * SubPlanFragmenter is an implementation of {@link PlanNodeVisitor} to
fragment a query plan into multiple sub-plans.
+ * TODO: Currently it is not hooked up because we don't support multiple
sub-plans yet.
*
* The fragmenting process is as follows:
* 1. Traverse the plan tree in a depth-first manner;
* 2. For each node, if it is a SubPlan splittable ExchangeNode, switch it to
a {@link LiteralValueNode};
- * 3. Increment current SubPlan Id by one and keep traverse the tree.
+ * 3. Increment current SubPlan ID by one and keep traverse the tree.
*/
public class SubPlanFragmenter implements PlanNodeVisitor<PlanNode,
SubPlanFragmenter.Context> {
public static final SubPlanFragmenter INSTANCE = new SubPlanFragmenter();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]