This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 04c8a4f6ee [multistage] Add Leaf Stage Worker Assignment / Boundary /
Agg Rules (#15481)
04c8a4f6ee is described below
commit 04c8a4f6ee80e471f7f8e6bd23048d6992eea8f0
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Apr 14 16:03:00 2025 -0500
[multistage] Add Leaf Stage Worker Assignment / Boundary / Agg Rules
(#15481)
---
.../pinot/calcite/rel/traits/TraitAssignment.java | 37 +-
.../planner/physical/v2/HashDistributionDesc.java | 8 +-
.../pinot/query/planner/physical/v2/PRelNode.java | 21 +
.../planner/physical/v2/PinotDataDistribution.java | 8 +-
.../planner/physical/v2/RelToPRelConverter.java | 136 +++++++
.../physical/v2/mapping/DistMappingGenerator.java | 109 ++++++
.../physical/v2/mapping/PinotDistMapping.java | 82 ++++
.../physical/v2/nodes/PhysicalAggregate.java | 10 +
.../planner/physical/v2/nodes/PhysicalFilter.java | 9 +
.../planner/physical/v2/nodes/PhysicalProject.java | 9 +
.../planner/physical/v2/nodes/PhysicalSort.java | 9 +
.../physical/v2/nodes/PhysicalTableScan.java | 12 +-
.../physical/v2/opt/PhysicalOptRuleSet.java | 42 ++
.../planner/physical/v2/opt/RuleExecutors.java | 43 +++
.../v2/opt/rules/LeafStageAggregateRule.java | 96 +++++
.../v2/opt/rules/LeafStageBoundaryRule.java | 90 +++++
.../opt/rules/LeafStageWorkerAssignmentRule.java | 424 +++++++++++++++++++++
.../physical/v2/mapping/PinotDistMappingTest.java | 115 ++++++
.../rules/LeafStageWorkerAssignmentRuleTest.java | 207 ++++++++++
19 files changed, 1441 insertions(+), 26 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
index c6e0cacd5c..f2ba392129 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
@@ -36,6 +36,7 @@ import org.apache.calcite.rel.core.Window;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject;
@@ -52,33 +53,35 @@ import
org.apache.pinot.query.planner.physical.v2.nodes.PhysicalWindow;
public class TraitAssignment {
private final Supplier<Integer> _planIdGenerator;
- public TraitAssignment(Supplier<Integer> planIdGenerator) {
+ private TraitAssignment(Supplier<Integer> planIdGenerator) {
_planIdGenerator = planIdGenerator;
}
- public static RelNode assign(RelNode relNode, PhysicalPlannerContext
physicalPlannerContext) {
+ public static PRelNode assign(PRelNode pRelNode, PhysicalPlannerContext
physicalPlannerContext) {
TraitAssignment traitAssignment = new
TraitAssignment(physicalPlannerContext.getNodeIdGenerator());
- return traitAssignment.assign(relNode);
+ return traitAssignment.assign(pRelNode);
}
- public RelNode assign(RelNode node) {
+ @VisibleForTesting
+ PRelNode assign(PRelNode pRelNode) {
// Process inputs first.
+ RelNode relNode = pRelNode.unwrap();
List<RelNode> newInputs = new ArrayList<>();
- for (RelNode input : node.getInputs()) {
- newInputs.add(assign(input));
+ for (RelNode input : relNode.getInputs()) {
+ newInputs.add(assign((PRelNode) input).unwrap());
}
- node = node.copy(node.getTraitSet(), newInputs);
- // Process current node.
- if (node instanceof PhysicalSort) {
- return assignSort((PhysicalSort) node);
- } else if (node instanceof PhysicalJoin) {
- return assignJoin((PhysicalJoin) node);
- } else if (node instanceof PhysicalAggregate) {
- return assignAggregate((PhysicalAggregate) node);
- } else if (node instanceof PhysicalWindow) {
- return assignWindow((PhysicalWindow) node);
+ relNode = relNode.copy(relNode.getTraitSet(), newInputs);
+ // Process current relNode.
+ if (relNode instanceof PhysicalSort) {
+ return (PRelNode) assignSort((PhysicalSort) relNode);
+ } else if (relNode instanceof PhysicalJoin) {
+ return (PRelNode) assignJoin((PhysicalJoin) relNode);
+ } else if (relNode instanceof PhysicalAggregate) {
+ return (PRelNode) assignAggregate((PhysicalAggregate) relNode);
+ } else if (relNode instanceof PhysicalWindow) {
+ return (PRelNode) assignWindow((PhysicalWindow) relNode);
}
- return node;
+ return (PRelNode) relNode;
}
/**
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
index 3c15b6c656..7c2284cce5 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
-import org.apache.calcite.util.mapping.Mappings;
+import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
/**
@@ -63,15 +63,15 @@ public class HashDistributionDesc {
* partitioning info.
*/
@Nullable
- public HashDistributionDesc apply(Mappings.TargetMapping targetMapping) {
+ public HashDistributionDesc apply(PinotDistMapping mapping) {
for (Integer currentKey : _keys) {
- if (currentKey >= targetMapping.getSourceCount() ||
targetMapping.getTargetOpt(currentKey) == -1) {
+ if (currentKey >= mapping.getSourceCount() ||
mapping.getTarget(currentKey) == -1) {
return null;
}
}
List<Integer> newKey = new ArrayList<>();
for (int currentKey : _keys) {
- newKey.add(targetMapping.getTargetOpt(currentKey));
+ newKey.add(mapping.getTarget(currentKey));
}
return new HashDistributionDesc(newKey, _hashFunction, _numPartitions);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNode.java
index 8f53d0b067..177870cbd7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNode.java
@@ -21,6 +21,8 @@ package org.apache.pinot.query.planner.physical.v2;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
@@ -81,6 +83,21 @@ public interface PRelNode {
return null;
}
+ /**
+ * TODO(mse-physical): This does not check PinotExecStrategyTrait. We should
revisit whether exec strategy should be
+ * a trait or not.
+ */
+ default boolean areTraitsSatisfied() {
+ RelNode relNode = unwrap();
+ RelDistribution distribution = relNode.getTraitSet().getDistribution();
+ PinotDataDistribution dataDistribution = getPinotDataDistributionOrThrow();
+ if (dataDistribution.satisfies(distribution)) {
+ RelCollation collation = relNode.getTraitSet().getCollation();
+ return dataDistribution.satisfies(collation);
+ }
+ return false;
+ }
+
PRelNode with(int newNodeId, List<PRelNode> newInputs, PinotDataDistribution
newDistribution);
default PRelNode with(List<PRelNode> newInputs, PinotDataDistribution
newDistribution) {
@@ -90,4 +107,8 @@ public interface PRelNode {
default PRelNode with(List<PRelNode> newInputs) {
return with(getNodeId(), newInputs, getPinotDataDistributionOrThrow());
}
+
+ default PRelNode asLeafStage() {
+ throw new UnsupportedOperationException(String.format("Cannot make %s a
leaf stage node", unwrap()));
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistribution.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistribution.java
index 36ea5ca3cc..36002cb32f 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistribution.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistribution.java
@@ -28,7 +28,7 @@ import javax.annotation.Nullable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.util.mapping.Mappings;
+import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
/**
@@ -157,13 +157,13 @@ public class PinotDataDistribution {
return _collation.satisfies(relCollation);
}
- public PinotDataDistribution apply(@Nullable Mappings.TargetMapping
targetMapping) {
- if (targetMapping == null) {
+ public PinotDataDistribution apply(@Nullable PinotDistMapping mapping) {
+ if (mapping == null) {
return new PinotDataDistribution(RelDistribution.Type.ANY, _workers,
_workerHash, null, null);
}
Set<HashDistributionDesc> newHashDesc = new HashSet<>();
for (HashDistributionDesc desc : _hashDistributionDesc) {
- HashDistributionDesc newDescs = desc.apply(targetMapping);
+ HashDistributionDesc newDescs = desc.apply(mapping);
if (newDescs != null) {
newHashDesc.add(newDescs);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
new file mode 100644
index 0000000000..6b80fd4d38
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.core.Window;
+import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
+import org.apache.pinot.calcite.rel.traits.TraitAssignment;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalFilter;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalUnion;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalValues;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalWindow;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
+import org.apache.pinot.query.planner.physical.v2.opt.PhysicalOptRuleSet;
+import org.apache.pinot.query.planner.physical.v2.opt.RuleExecutor;
+import org.apache.pinot.query.planner.physical.v2.opt.RuleExecutors;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Converts a tree of RelNode to a tree of PRelNode, running the configured
Physical Optimizers in the process.
+ */
+public class RelToPRelConverter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelToPRelConverter.class);
+
+ private RelToPRelConverter() {
+ }
+
+ public static PRelNode toPRelNode(RelNode relNode, PhysicalPlannerContext
context, TableCache tableCache) {
+ // Step-1: Convert each RelNode to a PRelNode
+ PRelNode rootPRelNode = create(relNode, context.getNodeIdGenerator());
+ // Step-2: Assign traits
+ rootPRelNode = TraitAssignment.assign(rootPRelNode, context);
+ // Step-3: Run physical optimizer rules.
+ var ruleAndExecutorList = PhysicalOptRuleSet.create(context, tableCache);
+ for (var ruleAndExecutor : ruleAndExecutorList) {
+ PRelOptRule rule = ruleAndExecutor.getLeft();
+ RuleExecutor executor = RuleExecutors.create(ruleAndExecutor.getRight(),
rule, context);
+ rootPRelNode = executor.execute(rootPRelNode);
+ }
+ return rootPRelNode;
+ }
+
+ public static PRelNode create(RelNode relNode, Supplier<Integer>
nodeIdGenerator) {
+ List<PRelNode> inputs = new ArrayList<>();
+ for (RelNode input : relNode.getInputs()) {
+ inputs.add(create(input, nodeIdGenerator));
+ }
+ if (relNode instanceof TableScan) {
+ Preconditions.checkState(inputs.isEmpty(), "Expected no inputs to table
scan. Found: %s", inputs);
+ return new PhysicalTableScan((TableScan) relNode, nodeIdGenerator.get(),
null, null);
+ } else if (relNode instanceof Filter) {
+ Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of filter. Found: %s", inputs);
+ Filter filter = (Filter) relNode;
+ return new PhysicalFilter(filter.getCluster(), filter.getTraitSet(),
filter.getHints(), filter.getCondition(),
+ nodeIdGenerator.get(), inputs.get(0), null, false);
+ } else if (relNode instanceof Project) {
+ Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of project. Found: %s", inputs);
+ Project project = (Project) relNode;
+ return new PhysicalProject(project.getCluster(), project.getTraitSet(),
project.getHints(), project.getProjects(),
+ project.getRowType(), project.getVariablesSet(),
nodeIdGenerator.get(), inputs.get(0), null, false);
+ } else if (relNode instanceof PinotLogicalAggregate) {
+ Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of agg. Found: %s", inputs);
+ PinotLogicalAggregate aggRel = (PinotLogicalAggregate) relNode;
+ return new PhysicalAggregate(aggRel.getCluster(), aggRel.getTraitSet(),
aggRel.getHints(), aggRel.getGroupSet(),
+ aggRel.getGroupSets(), aggRel.getAggCallList(),
nodeIdGenerator.get(), inputs.get(0), null, false,
+ AggregateNode.AggType.DIRECT, false, List.of(), 0);
+ } else if (relNode instanceof Join) {
+ Preconditions.checkState(relNode.getInputs().size() == 2, "Expected
exactly 2 inputs to join. Found: %s", inputs);
+ Join join = (Join) relNode;
+ return new PhysicalJoin(join.getCluster(), join.getTraitSet(),
join.getHints(), join.getCondition(),
+ join.getVariablesSet(), join.getJoinType(), nodeIdGenerator.get(),
inputs.get(0), inputs.get(1), null);
+ } else if (relNode instanceof Union) {
+ Union union = (Union) relNode;
+ return new PhysicalUnion(union.getCluster(), union.getTraitSet(),
union.getHints(), union.all, inputs,
+ nodeIdGenerator.get(), null);
+ } else if (relNode instanceof Minus) {
+ Minus minus = (Minus) relNode;
+ return new PhysicalUnion(minus.getCluster(), minus.getTraitSet(),
minus.getHints(), minus.all, inputs,
+ nodeIdGenerator.get(), null);
+ } else if (relNode instanceof Sort) {
+ Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of sort. Found: %s", inputs);
+ Sort sort = (Sort) relNode;
+ return new PhysicalSort(sort.getCluster(), sort.getTraitSet(),
sort.getHints(), sort.getCollation(), sort.offset,
+ sort.fetch, inputs.get(0), nodeIdGenerator.get(), null, false);
+ } else if (relNode instanceof Values) {
+ Preconditions.checkState(inputs.isEmpty(), "Expected no inputs to
values. Found: %s", inputs);
+ Values values = (Values) relNode;
+ return new PhysicalValues(values.getCluster(), values.getHints(),
values.getRowType(), values.getTuples(),
+ values.getTraitSet(), nodeIdGenerator.get(), null);
+ } else if (relNode instanceof Window) {
+ Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of window. Found: %s", inputs);
+ Window window = (Window) relNode;
+ return new PhysicalWindow(window.getCluster(), window.getTraitSet(),
window.getHints(), window.getConstants(),
+ window.getRowType(), window.groups, nodeIdGenerator.get(),
inputs.get(0), null);
+ }
+ throw new IllegalStateException("Unexpected relNode type: " +
relNode.getClass().getName());
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/mapping/DistMappingGenerator.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/mapping/DistMappingGenerator.java
new file mode 100644
index 0000000000..39413a0268
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/mapping/DistMappingGenerator.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.mapping;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.commons.collections4.CollectionUtils;
+
+
+/**
+ * Generates {@link PinotDistMapping} for a given source and destination
RelNode.
+ */
+public class DistMappingGenerator {
+ private DistMappingGenerator() {
+ }
+
+ /**
+ * Source to destination mapping.
+ */
+ public static PinotDistMapping compute(RelNode source, RelNode destination,
+ @Nullable List<RelNode> leadingSiblings) {
+ if (destination instanceof Project) {
+ Project project = (Project) destination;
+ return computeExactInputRefMapping(project,
source.getRowType().getFieldCount());
+ } else if (destination instanceof Window) {
+ // Window preserves input fields, and appends a field for each window
expr.
+ Window window = (Window) destination;
+ return
PinotDistMapping.identity(window.getInput().getRowType().getFieldCount());
+ } else if (destination instanceof Aggregate) {
+ Aggregate aggregate = (Aggregate) destination;
+ PinotDistMapping mapping = new
PinotDistMapping(source.getRowType().getFieldCount());
+ List<Integer> groupSet = aggregate.getGroupSet().asList();
+ for (int j = 0; j < groupSet.size(); j++) {
+ mapping.set(groupSet.get(j), j);
+ }
+ return mapping;
+ } else if (destination instanceof Join) {
+ if (CollectionUtils.isEmpty(leadingSiblings)) {
+ return PinotDistMapping.identity(source.getRowType().getFieldCount());
+ }
+ int leftFieldCount = 0;
+ for (RelNode sibling : leadingSiblings) {
+ leftFieldCount += sibling.getRowType().getFieldCount();
+ }
+ PinotDistMapping mapping = new
PinotDistMapping(source.getRowType().getFieldCount());
+ for (int i = 0; i < mapping.getSourceCount(); i++) {
+ mapping.set(i, i + leftFieldCount);
+ }
+ return mapping;
+ } else if (destination instanceof Filter) {
+ return PinotDistMapping.identity(source.getRowType().getFieldCount());
+ } else if (destination instanceof TableScan) {
+ throw new IllegalStateException("Found destination as TableScan in
MappingGenerator");
+ } else if (destination instanceof Values) {
+ throw new IllegalStateException("Found destination as Values in
MappingGenerator");
+ } else if (destination instanceof Sort) {
+ return PinotDistMapping.identity(source.getRowType().getFieldCount());
+ } else if (destination instanceof SetOp) {
+ SetOp setOp = (SetOp) destination;
+ if (setOp.isHomogeneous(true)) {
+ return PinotDistMapping.identity(source.getRowType().getFieldCount());
+ }
+ // TODO(mse-physical): Handle heterogeneous set ops. Currently we drop
al mapping refs.
+ return new PinotDistMapping(source.getRowType().getFieldCount());
+ }
+ throw new IllegalStateException("Unknown node type: " +
destination.getClass());
+ }
+
+ private static PinotDistMapping computeExactInputRefMapping(Project project,
int sourceCount) {
+ PinotDistMapping mapping = new PinotDistMapping(sourceCount);
+ int indexInCurrentRelNode = 0;
+ for (RexNode rexNode : project.getProjects()) {
+ if (rexNode instanceof RexInputRef) {
+ RexInputRef rexInputRef = (RexInputRef) rexNode;
+ mapping.set(rexInputRef.getIndex(), indexInCurrentRelNode);
+ }
+ indexInCurrentRelNode++;
+ }
+ return mapping;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/mapping/PinotDistMapping.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/mapping/PinotDistMapping.java
new file mode 100644
index 0000000000..1a083e0314
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/mapping/PinotDistMapping.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.mapping;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Mapping specifically for Pinot Data Distribution and trait mapping. A
mapping is defined for a source / destination
+ * RelNode pair and is used to track how input fields are mapped to output
fields.
+ */
+public class PinotDistMapping {
+ private static final int DEFAULT_MAPPING_VALUE = -1;
+ private final int _sourceCount;
+ private final Map<Integer, Integer> _sourceToTargetMapping = new HashMap<>();
+
+ public PinotDistMapping(int sourceCount) {
+ _sourceCount = sourceCount;
+ for (int i = 0; i < sourceCount; i++) {
+ _sourceToTargetMapping.put(i, DEFAULT_MAPPING_VALUE);
+ }
+ }
+
+ public static PinotDistMapping identity(int sourceCount) {
+ PinotDistMapping mapping = new PinotDistMapping(sourceCount);
+ for (int i = 0; i < sourceCount; i++) {
+ mapping.set(i, i);
+ }
+ return mapping;
+ }
+
+ public int getSourceCount() {
+ return _sourceCount;
+ }
+
+ public int getTarget(int source) {
+ Preconditions.checkArgument(source >= 0 && source < _sourceCount, "Invalid
source index: %s", source);
+ Integer target = _sourceToTargetMapping.get(source);
+ return target == null ? DEFAULT_MAPPING_VALUE : target;
+ }
+
+ public void set(int source, int target) {
+ Preconditions.checkArgument(source >= 0 && source < _sourceCount, "Invalid
source index: %s", source);
+ _sourceToTargetMapping.put(source, target);
+ }
+
+ public List<Integer> getMappedKeys(List<Integer> existingKeys) {
+ List<Integer> result = new ArrayList<>(existingKeys.size());
+ for (int key : existingKeys) {
+ Integer mappedKey = _sourceToTargetMapping.get(key);
+ Preconditions.checkArgument(mappedKey != null,
+ "Key %s not found in mapping with source count: %s", key,
_sourceCount);
+ if (mappedKey != DEFAULT_MAPPING_VALUE) {
+ result.add(mappedKey);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+ return result;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
index 9a76e9e31d..4f7d026996 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
@@ -103,4 +103,14 @@ public class PhysicalAggregate extends Aggregate
implements PRelNode {
getAggCallList(), newNodeId, newInputs.get(0), newDistribution,
_leafStage, _aggType, _leafReturnFinalResult,
_collations, _limit);
}
+
+ @Override
+ public PhysicalAggregate asLeafStage() {
+ if (isLeafStage()) {
+ return this;
+ }
+ return new PhysicalAggregate(getCluster(), getTraitSet(), getHints(),
getGroupSet(), getGroupSets(),
+ getAggCallList(), _nodeId, _pRelInputs.get(0), _pinotDataDistribution,
true, _aggType, _leafReturnFinalResult,
+ _collations, _limit);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalFilter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalFilter.java
index d483382394..1f5ecee7d4 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalFilter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalFilter.java
@@ -92,4 +92,13 @@ public class PhysicalFilter extends Filter implements
PRelNode {
return new PhysicalFilter(getCluster(), getTraitSet(), getHints(),
condition, newNodeId, newInputs.get(0),
newDistribution, _leafStage);
}
+
+ @Override
+ public PRelNode asLeafStage() {
+ if (isLeafStage()) {
+ return this;
+ }
+ return new PhysicalFilter(getCluster(), getTraitSet(), getHints(),
condition, _nodeId, _pRelInputs.get(0),
+ _pinotDataDistribution, true);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalProject.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalProject.java
index a8b7fde656..63065e38ff 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalProject.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalProject.java
@@ -90,4 +90,13 @@ public class PhysicalProject extends Project implements
PRelNode {
return new PhysicalProject(getCluster(), getTraitSet(), getHints(),
getProjects(), getRowType(), getVariablesSet(),
newNodeId, newInputs.get(0), newDistribution, _leafStage);
}
+
+ @Override
+ public PRelNode asLeafStage() {
+ if (isLeafStage()) {
+ return this;
+ }
+ return new PhysicalProject(getCluster(), getTraitSet(), getHints(),
getProjects(), getRowType(),
+ getVariablesSet(), _nodeId, _pRelInputs.get(0),
_pinotDataDistribution, true);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalSort.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalSort.java
index 486f1d9d0f..25bad2b107 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalSort.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalSort.java
@@ -87,4 +87,13 @@ public class PhysicalSort extends Sort implements PRelNode {
return new PhysicalSort(getCluster(), getTraitSet(), getHints(),
getCollation(), offset, fetch, newInputs.get(0),
newNodeId, newDistribution, _leafStage);
}
+
+ @Override
+ public PRelNode asLeafStage() {
+ if (isLeafStage()) {
+ return this;
+ }
+ return new PhysicalSort(getCluster(), getTraitSet(), getHints(),
getCollation(), offset, fetch, _pRelInputs.get(0),
+ _nodeId, _pinotDataDistribution, true);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalTableScan.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalTableScan.java
index cdbf90db41..18d72fd37d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalTableScan.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalTableScan.java
@@ -39,7 +39,7 @@ public class PhysicalTableScan extends TableScan implements
PRelNode {
@Nullable
private final TableScanMetadata _tableScanMetadata;
- public PhysicalTableScan(TableScan tableScan, int nodeId,
PinotDataDistribution pinotDataDistribution,
+ public PhysicalTableScan(TableScan tableScan, int nodeId, @Nullable
PinotDataDistribution pinotDataDistribution,
@Nullable TableScanMetadata tableScanMetadata) {
this(tableScan.getCluster(), tableScan.getTraitSet(),
tableScan.getHints(), tableScan.getTable(), nodeId,
pinotDataDistribution, tableScanMetadata);
@@ -99,4 +99,14 @@ public class PhysicalTableScan extends TableScan implements
PRelNode {
return new PhysicalTableScan(getCluster(), getTraitSet(), getHints(),
getTable(), newNodeId,
newDistribution, _tableScanMetadata);
}
+
+ @Override
+ public PRelNode asLeafStage() {
+ return this;
+ }
+
+ public PhysicalTableScan with(PinotDataDistribution pinotDataDistribution,
TableScanMetadata metadata) {
+ return new PhysicalTableScan(getCluster(), getTraitSet(), getHints(),
getTable(), _nodeId,
+ pinotDataDistribution, metadata);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
new file mode 100644
index 0000000000..9db842fabf
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt;
+
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageBoundaryRule;
+import
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule;
+
+
+public class PhysicalOptRuleSet {
+ private PhysicalOptRuleSet() {
+ }
+
+ public static List<Pair<PRelOptRule, RuleExecutors.Type>>
create(PhysicalPlannerContext context,
+ TableCache tableCache) {
+ return List.of(
+ Pair.of(LeafStageBoundaryRule.INSTANCE, RuleExecutors.Type.POST_ORDER),
+ Pair.of(new LeafStageWorkerAssignmentRule(context, tableCache),
RuleExecutors.Type.POST_ORDER));
+ // Pair.of(new WorkerExchangeAssignmentRule(context),
RuleExecutors.Type.IN_ORDER),
+ // Pair.of(AggregatePushdownRule.INSTANCE,
RuleExecutors.Type.POST_ORDER),
+ // Pair.of(SortPushdownRule.INSTANCE, RuleExecutors.Type.POST_ORDER));
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/RuleExecutors.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/RuleExecutors.java
new file mode 100644
index 0000000000..b8c08b6d80
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/RuleExecutors.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt;
+
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+
+
+public class RuleExecutors {
+ private RuleExecutors() {
+ }
+
+ public static RuleExecutor create(Type type, PRelOptRule rule,
PhysicalPlannerContext context) {
+ switch (type) {
+ case POST_ORDER:
+ return new PostOrderRuleExecutor(rule, context);
+ case IN_ORDER:
+ return new LeftInputFirstRuleExecutor(rule, context);
+ default:
+ throw new IllegalStateException(String.format("Unrecognized rule
executor type: %s", type));
+ }
+ }
+
+ public enum Type {
+ POST_ORDER,
+ IN_ORDER
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageAggregateRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageAggregateRule.java
new file mode 100644
index 0000000000..e8121b063b
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageAggregateRule.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt.rules;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
+import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
+import org.apache.pinot.query.planner.physical.v2.mapping.DistMappingGenerator;
+import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall;
+
+
+/**
+ * Often it might be possible to promote an aggregate on top of a leaf stage
to be part of the leaf stage. This rule
+ * handles that case. This is different from aggregate pushdown because
pushdown is related to taking a decision about
+ * whether we should split the aggregate over an exchange into two, whereas
this rule is able to avoid the Exchange
+ * altogether.
+ */
+public class LeafStageAggregateRule extends PRelOptRule {
+ private final PhysicalPlannerContext _physicalPlannerContext;
+
+ public LeafStageAggregateRule(PhysicalPlannerContext physicalPlannerContext)
{
+ _physicalPlannerContext = physicalPlannerContext;
+ }
+
+ @Override
+ public boolean matches(PRelOptRuleCall call) {
+ if (call._currentNode.isLeafStage()) {
+ return false;
+ }
+ PRelNode currentNode = call._currentNode;
+ if (!(currentNode.unwrap() instanceof Aggregate)) {
+ return false;
+ }
+ if (!isProjectFilterOrScan(currentNode.getPRelInput(0).unwrap())) {
+ return false;
+ }
+ // ==> We have: "aggregate (non-leaf stage) > project|filter|table-scan
(leaf-stage)"
+ PhysicalAggregate aggRel = (PhysicalAggregate) currentNode.unwrap();
+ PRelNode pRelInput = aggRel.getPRelInput(0);
+ if (isPartitionedByHintPresent(aggRel)) {
+ Preconditions.checkState(aggRel.getInput().getTraitSet().getCollation()
== null,
+ "Aggregate input has sort constraint, but partition-by hint is
forcing to skip exchange");
+ return true;
+ }
+ return pRelInput.areTraitsSatisfied();
+ }
+
+ @Override
+ public PRelNode onMatch(PRelOptRuleCall call) {
+ PhysicalAggregate currentNode = (PhysicalAggregate) call._currentNode;
+ PinotDistMapping mapping =
DistMappingGenerator.compute(currentNode.getPRelInput(0).unwrap(),
+ currentNode.unwrap(), null);
+ PinotDataDistribution derivedDistribution =
currentNode.getPRelInput(0).getPinotDataDistributionOrThrow()
+ .apply(mapping);
+ return currentNode.with(currentNode.getPRelInputs(), derivedDistribution);
+ }
+
+ private static boolean isPartitionedByHintPresent(PhysicalAggregate aggRel) {
+ Map<String, String> hintOptions =
+ PinotHintStrategyTable.getHintOptions(aggRel.getHints(),
PinotHintOptions.AGGREGATE_HINT_OPTIONS);
+ hintOptions = hintOptions == null ? Map.of() : hintOptions;
+ return
Boolean.parseBoolean(hintOptions.get(PinotHintOptions.AggregateOptions.IS_PARTITIONED_BY_GROUP_BY_KEYS));
+ }
+
+ private static boolean isProjectFilterOrScan(RelNode relNode) {
+ return relNode instanceof TableScan || relNode instanceof Project ||
relNode instanceof Filter;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageBoundaryRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageBoundaryRule.java
new file mode 100644
index 0000000000..42b7cf9c65
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageBoundaryRule.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt.rules;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall;
+
+
+/**
+ * The leaf stage consists of a table-scan and an optional project and/or
filter. The filter and project nodes
+ * may be in any order. We don't include sort or aggregate in the leaf stage
in this rule, because they will made part
+ * of Leaf stage (if appropriate) as part of the Aggregate and Sort pushdown
rules.
+ * <p>
+ * The idea is that you can and should always make filter and project part of
the leaf stage and compute them locally
+ * on the server where the table-scan is computed. Whether it makes sense to
run the aggregate or sort in the leaf
+ * stage depends on a few conditions, and hence it is handled as part of
their respective pushdown rules.
+ * </p>
+ */
+public class LeafStageBoundaryRule extends PRelOptRule {
+ public static final LeafStageBoundaryRule INSTANCE = new
LeafStageBoundaryRule();
+
+ private LeafStageBoundaryRule() {
+ }
+
+ @Override
+ public boolean matches(PRelOptRuleCall call) {
+ RelNode currentRel = call._currentNode.unwrap();
+ if (currentRel instanceof TableScan) {
+ return true;
+ }
+ if (!isProjectOrFilter(currentRel)) {
+ return false;
+ }
+ if (isTableScan(currentRel.getInput(0))) {
+ // (Project|Filter) > Table Scan
+ return true;
+ }
+ if (isProject(currentRel) && isFilter(currentRel.getInput(0))
+ && isTableScan(currentRel.getInput(0).getInput(0))) {
+ // Project > Filter > Table Scan
+ return true;
+ }
+ // Filter > Project > Table Scan
+ return isFilter(currentRel) && isProject(currentRel.getInput(0))
+ && isTableScan(currentRel.getInput(0).getInput(0));
+ }
+
+ @Override
+ public PRelNode onMatch(PRelOptRuleCall call) {
+ PRelNode currentNode = call._currentNode;
+ return currentNode.asLeafStage();
+ }
+
+ private boolean isProjectOrFilter(RelNode relNode) {
+ return isProject(relNode) || isFilter(relNode);
+ }
+
+ private boolean isProject(RelNode relNode) {
+ return relNode instanceof Project;
+ }
+
+ private boolean isFilter(RelNode relNode) {
+ return relNode instanceof Filter;
+ }
+
+ private boolean isTableScan(RelNode relNode) {
+ return relNode instanceof TableScan;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
new file mode 100644
index 0000000000..3295e2cf37
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt.rules;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
+import org.apache.pinot.query.planner.physical.v2.TableScanMetadata;
+import org.apache.pinot.query.planner.physical.v2.mapping.DistMappingGenerator;
+import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * <h1>Overview</h1>
+ * Assigns workers to all PRelNodes that are part of the leaf stage as
determined by {@link PRelNode#isLeafStage()}.
+ * The workers are mainly determined by the Table Scan, unless filter based
server pruning is enabled.
+ * <h1>Current Features</h1>
+ * <ul>
+ * <li>
+ * Automatically detects partitioning and adds that information to
PinotDataDistribution. This will be used
+ * in subsequent worker assignment steps to simplify Exchange.
+ * </li>
+ * </ul>
+ * <h1>Planned / Upcoming Features</h1>
+ * <ul>
+ * <li>Support for look-up join.</li>
+ * <li>Support for partition parallelism and the colocated join hint. See F2
in #15455.</li>
+ * <li>Support for Hybrid Tables for automatic partitioning inference.</li>
+ * <li>Server pruning based on filter predicates.</li>
+ * </ul>
+ */
+public class LeafStageWorkerAssignmentRule extends PRelOptRule {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LeafStageWorkerAssignmentRule.class);
+ private final TableCache _tableCache;
+ private final RoutingManager _routingManager;
+ private final PhysicalPlannerContext _physicalPlannerContext;
+
+ public LeafStageWorkerAssignmentRule(PhysicalPlannerContext
physicalPlannerContext, TableCache tableCache) {
+ _routingManager = physicalPlannerContext.getRoutingManager();
+ _physicalPlannerContext = physicalPlannerContext;
+ _tableCache = tableCache;
+ }
+
+ @Override
+ public boolean matches(PRelOptRuleCall call) {
+ return call._currentNode.isLeafStage();
+ }
+
+ @Override
+ public PRelNode onMatch(PRelOptRuleCall call) {
+ if (call._currentNode.unwrap() instanceof TableScan) {
+ return assignTableScan((PhysicalTableScan) call._currentNode,
_physicalPlannerContext.getRequestId());
+ }
+ PRelNode currentNode = call._currentNode;
+ Preconditions.checkState(currentNode.isLeafStage(), "Leaf stage worker
assignment called for non-leaf stage node:"
+ + " %s", currentNode);
+ PinotDistMapping mapping =
DistMappingGenerator.compute(currentNode.getPRelInput(0).unwrap(),
+ currentNode.unwrap(), null);
+ PinotDataDistribution derivedDistribution =
currentNode.getPRelInput(0).getPinotDataDistributionOrThrow()
+ .apply(mapping);
+ return currentNode.with(currentNode.getPRelInputs(), derivedDistribution);
+ }
+
+ private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long
requestId) {
+ // Step-1: Init tableName, table options, routing table and time boundary
info.
+ String tableName = Objects.requireNonNull(getActualTableName(tableScan),
"Table not found");
+ Map<String, String> tableOptions = getTableOptions(tableScan.getHints());
+ Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName,
requestId);
+ Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find
routing entries for table: %s", tableName);
+ // acquire time boundary info if it is a hybrid table.
+ TimeBoundaryInfo timeBoundaryInfo = null;
+ if (routingTableMap.size() > 1) {
+ timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(
+ TableNameBuilder.forType(TableType.OFFLINE)
+
.tableNameWithType(TableNameBuilder.extractRawTableName(tableName)));
+ if (timeBoundaryInfo == null) {
+ // remove offline table routing if no time boundary info is acquired.
+ routingTableMap.remove(TableType.OFFLINE.name());
+ }
+ }
+ // Step-2: Compute instance to segments map and unavailable segments.
+ Map<String, Set<String>> segmentUnavailableMap = new HashMap<>();
+ InstanceIdToSegments instanceIdToSegments = new InstanceIdToSegments();
+ for (Map.Entry<String, RoutingTable> routingEntry :
routingTableMap.entrySet()) {
+ String tableType = routingEntry.getKey();
+ RoutingTable routingTable = routingEntry.getValue();
+ Map<String, List<String>> currentSegmentsMap = new HashMap<>();
+ Map<ServerInstance, ServerRouteInfo> tmp =
routingTable.getServerInstanceToSegmentsMap();
+ for (Map.Entry<ServerInstance, ServerRouteInfo> serverEntry :
tmp.entrySet()) {
+ String instanceId = serverEntry.getKey().getInstanceId();
+ Preconditions.checkState(currentSegmentsMap.put(instanceId,
serverEntry.getValue().getSegments()) == null,
+ "Entry for server %s and table type: %s already exist!",
serverEntry.getKey(), tableType);
+
_physicalPlannerContext.getInstanceIdToQueryServerInstance().computeIfAbsent(instanceId,
+ (ignore) -> new QueryServerInstance(serverEntry.getKey()));
+ }
+ if (tableType.equalsIgnoreCase(TableType.OFFLINE.name())) {
+ instanceIdToSegments._offlineTableSegmentsMap = currentSegmentsMap;
+ } else {
+ instanceIdToSegments._realtimeTableSegmentsMap = currentSegmentsMap;
+ }
+ if (!routingTable.getUnavailableSegments().isEmpty()) {
+ // Set unavailable segments in context, keyed by PRelNode ID.
+
segmentUnavailableMap.put(TableNameBuilder.forType(TableType.valueOf(tableName)).tableNameWithType(tableName),
+ new HashSet<>(routingTable.getUnavailableSegments()));
+ }
+ }
+ List<String> fieldNames = tableScan.getRowType().getFieldNames();
+ Map<String, TablePartitionInfo> tablePartitionInfoMap =
calculateTablePartitionInfo(tableName,
+ routingTableMap.keySet());
+ TableScanWorkerAssignmentResult workerAssignmentResult =
assignTableScan(tableName, fieldNames,
+ instanceIdToSegments, tablePartitionInfoMap);
+ TableScanMetadata metadata = new TableScanMetadata(Set.of(tableName),
workerAssignmentResult._workerIdToSegmentsMap,
+ tableOptions, segmentUnavailableMap, timeBoundaryInfo);
+ return tableScan.with(workerAssignmentResult._pinotDataDistribution,
metadata);
+ }
+
+ /**
+ * Assigns workers for the table-scan node, automatically detecting table
partitioning whenever possible. The
+ * arguments to this method are minimal to facilitate unit-testing.
+ */
+ @VisibleForTesting
+ static TableScanWorkerAssignmentResult assignTableScan(String tableName,
List<String> fieldNames,
+ InstanceIdToSegments instanceIdToSegments, Map<String,
TablePartitionInfo> tpiMap) {
+ Set<String> tableTypes = instanceIdToSegments.getActiveTableTypes();
+ Set<String> partitionedTableTypes =
tableTypes.stream().filter(tpiMap::containsKey).collect(Collectors.toSet());
+ Preconditions.checkState(!tableTypes.isEmpty(), "No routing entry for
offline or realtime type");
+ if (tableTypes.equals(partitionedTableTypes)) {
+ if (partitionedTableTypes.size() == 1) {
+ // Attempt partitioned distribution
+ String tableType = partitionedTableTypes.iterator().next();
+ String tableNameWithType =
TableNameBuilder.forType(TableType.valueOf(tableType)).tableNameWithType(tableName);
+ TableScanWorkerAssignmentResult assignmentResult =
attemptPartitionedDistribution(tableNameWithType,
+ fieldNames,
instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)),
+ tpiMap.get(tableType));
+ if (assignmentResult != null) {
+ return assignmentResult;
+ }
+ } else {
+ // TODO(mse-physical): Support automatic partitioned dist for hybrid
tables.
+ LOGGER.warn("Automatic Partitioned Distribution not supported for
Hybrid Tables yet");
+ }
+ }
+ // For each server, we want to know the segments for each table-type.
+ Map<String, Map<String, List<String>>> instanceIdToTableTypeToSegmentsMap
= new HashMap<>();
+ for (String tableType : tableTypes) {
+ Map<String, List<String>> segmentsMap =
instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType));
+ Preconditions.checkNotNull(segmentsMap, "Unexpected null segments map in
leaf worker assignment");
+ for (Map.Entry<String, List<String>> entry : segmentsMap.entrySet()) {
+ String instanceId = entry.getKey();
+ List<String> segments = entry.getValue();
+ instanceIdToTableTypeToSegmentsMap.computeIfAbsent(instanceId, k ->
new HashMap<>())
+ .put(tableType, segments);
+ }
+ }
+ // For each server, assign one worker each.
+ Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new
HashMap<>();
+ List<String> workers = new ArrayList<>();
+ for (Map.Entry<String, Map<String, List<String>>> entry :
instanceIdToTableTypeToSegmentsMap.entrySet()) {
+ String instanceId = entry.getKey();
+ for (var tableTypeAndSegments : entry.getValue().entrySet()) {
+ String tableType = tableTypeAndSegments.getKey();
+ List<String> segments = tableTypeAndSegments.getValue();
+ workerIdToSegmentsMap.computeIfAbsent(workers.size(), (x) -> new
HashMap<>()).put(tableType, segments);
+ }
+ workers.add(String.format("%s@%s", workers.size(), instanceId));
+ }
+ PinotDataDistribution pinotDataDistribution = new
PinotDataDistribution(RelDistribution.Type.RANDOM_DISTRIBUTED,
+ workers, workers.hashCode(), null, null);
+ return new TableScanWorkerAssignmentResult(pinotDataDistribution,
workerIdToSegmentsMap);
+ }
+
+ /**
+ * Tries to assign workers for the table-scan node to generate a partitioned
data distribution. If this is not
+ * possible, we simply return null.
+ */
+ @Nullable
+ @VisibleForTesting
+ static TableScanWorkerAssignmentResult attemptPartitionedDistribution(String
tableNameWithType,
+ List<String> fieldNames, Map<String, List<String>>
instanceIdToSegmentsMap,
+ TablePartitionInfo tablePartitionInfo) {
+ if
(CollectionUtils.isNotEmpty(tablePartitionInfo.getSegmentsWithInvalidPartition()))
{
+ LOGGER.warn("Table {} has {} segments with invalid partition info. Will
assume un-partitioned distribution",
+ tableNameWithType,
tablePartitionInfo.getSegmentsWithInvalidPartition().size());
+ return null;
+ }
+ String tableType =
+
Objects.requireNonNull(TableNameBuilder.getTableTypeFromTableName(tableNameWithType),
+ "Illegal state: expected table name with type").toString();
+ int numPartitions = tablePartitionInfo.getNumPartitions();
+ int keyIndex = fieldNames.indexOf(tablePartitionInfo.getPartitionColumn());
+ String function = tablePartitionInfo.getPartitionFunctionName();
+ int numSelectedServers = instanceIdToSegmentsMap.size();
+ if (keyIndex == -1) {
+ LOGGER.warn("Unable to find partition column {} in table scan fields
{}", tablePartitionInfo.getPartitionColumn(),
+ fieldNames);
+ return null;
+ } else if (numPartitions < numSelectedServers) {
+ return null;
+ }
+ // Pre-compute segmentToServer map for quick lookup later.
+ Map<String, String> segmentToServer = new HashMap<>();
+ for (var entry : instanceIdToSegmentsMap.entrySet()) {
+ String instanceId = entry.getKey();
+ for (String segment : entry.getValue()) {
+ segmentToServer.put(segment, instanceId);
+ }
+ }
+ // For each partition, we expect at most 1 server which will be stored in
this array.
+ String[] partitionToServerMap = new
String[tablePartitionInfo.getNumPartitions()];
+ TablePartitionInfo.PartitionInfo[] partitionInfos =
tablePartitionInfo.getPartitionInfoMap();
+ Map<Integer, List<String>> segmentsByPartition = new HashMap<>();
+ // Ensure each partition is assigned to exactly 1 server.
+ for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
+ TablePartitionInfo.PartitionInfo info = partitionInfos[partitionNum];
+ List<String> selectedSegments = new ArrayList<>();
+ if (info != null) {
+ String chosenServer;
+ for (String segment : info._segments) {
+ chosenServer = segmentToServer.get(segment);
+ // segmentToServer may return null if TPI has a segment not present
in instanceIdToSegmentsMap.
+ // This can happen when the segment was not selected for the query
(due to pruning for instance).
+ if (chosenServer != null) {
+ selectedSegments.add(segment);
+ if (partitionToServerMap[partitionNum] == null) {
+ partitionToServerMap[partitionNum] = chosenServer;
+ } else if
(!partitionToServerMap[partitionNum].equals(chosenServer)) {
+ return null;
+ }
+ }
+ }
+ }
+ segmentsByPartition.put(partitionNum, selectedSegments);
+ }
+ // Initialize workers list. Initially each element is empty. We have 1
worker for each selected server.
+ List<String> workers = new ArrayList<>();
+ for (int i = 0; i < numSelectedServers; i++) {
+ workers.add("");
+ }
+ // Try to assign workers in such a way that partition P goes to worker = P
% num-workers.
+ for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
+ if (partitionToServerMap[partitionNum] != null) {
+ int workerId = partitionNum % workers.size();
+ if (workers.get(workerId).isEmpty()) {
+ workers.set(workerId, partitionToServerMap[partitionNum]);
+ } else if
(!workers.get(workerId).equals(partitionToServerMap[partitionNum])) {
+ return null;
+ }
+ }
+ }
+ // Build the workerId to segments map.
+ Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new
HashMap<>();
+ for (int workerId = 0; workerId < workers.size(); workerId++) {
+ List<String> segmentsForWorker = new ArrayList<>();
+ for (int partitionNum = workerId; partitionNum < numPartitions;
partitionNum += workers.size()) {
+ segmentsForWorker.addAll(segmentsByPartition.get(partitionNum));
+ }
+ workers.set(workerId, String.format("%s@%s", workerId,
workers.get(workerId)));
+ workerIdToSegmentsMap.put(workerId, ImmutableMap.of(tableType,
segmentsForWorker));
+ }
+ HashDistributionDesc desc = new
HashDistributionDesc(ImmutableList.of(keyIndex), function, numPartitions);
+ PinotDataDistribution dataDistribution = new
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
+ workers, workers.hashCode(), ImmutableSet.of(desc), null);
+ return new TableScanWorkerAssignmentResult(dataDistribution,
workerIdToSegmentsMap);
+ }
+
+ private Map<String, TablePartitionInfo> calculateTablePartitionInfo(String
tableName, Set<String> tableTypes) {
+ Map<String, TablePartitionInfo> result = new HashMap<>();
+ if (tableTypes.contains("OFFLINE")) {
+ String offlineTableType =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+ TablePartitionInfo tablePartitionInfo =
_routingManager.getTablePartitionInfo(offlineTableType);
+ if (tablePartitionInfo != null) {
+ result.put("OFFLINE", tablePartitionInfo);
+ }
+ }
+ if (tableTypes.contains("REALTIME")) {
+ String realtimeTableType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ TablePartitionInfo tablePartitionInfo =
_routingManager.getTablePartitionInfo(realtimeTableType);
+ if (tablePartitionInfo != null) {
+ result.put("REALTIME",
_routingManager.getTablePartitionInfo(tableName));
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Acquire routing table for items listed in TableScanNode.
+ *
+ * @param tableName table name with or without type suffix.
+ * @return keyed-map from table type(s) to routing table(s).
+ */
+ private Map<String, RoutingTable> getRoutingTable(String tableName, long
requestId) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ Map<String, RoutingTable> routingTableMap = new HashMap<>();
+ RoutingTable routingTable;
+ if (tableType == null) {
+ routingTable = getRoutingTable(rawTableName, TableType.OFFLINE,
requestId);
+ if (routingTable != null) {
+ routingTableMap.put(TableType.OFFLINE.name(), routingTable);
+ }
+ routingTable = getRoutingTable(rawTableName, TableType.REALTIME,
requestId);
+ if (routingTable != null) {
+ routingTableMap.put(TableType.REALTIME.name(), routingTable);
+ }
+ } else {
+ routingTable = getRoutingTable(tableName, tableType, requestId);
+ if (routingTable != null) {
+ routingTableMap.put(tableType.name(), routingTable);
+ }
+ }
+ return routingTableMap;
+ }
+
+ private RoutingTable getRoutingTable(String tableName, TableType tableType,
long requestId) {
+ String tableNameWithType =
+
TableNameBuilder.forType(tableType).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+ return _routingManager.getRoutingTable(
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\""), requestId);
+ }
+
+ private Map<String, String> getTableOptions(List<RelHint> hints) {
+ Map<String, String> tmp =
PlanNode.NodeHint.fromRelHints(hints).getHintOptions().get(
+ PinotHintOptions.TABLE_HINT_OPTIONS);
+ return tmp == null ? Map.of() : tmp;
+ }
+
+ private String getActualTableName(TableScan tableScan) {
+ RelOptTable table = tableScan.getTable();
+ List<String> qualifiedName = table.getQualifiedName();
+ String tmp = qualifiedName.size() == 1 ? qualifiedName.get(0)
+ : DatabaseUtils.constructFullyQualifiedTableName(qualifiedName.get(0),
qualifiedName.get(1));
+ return _tableCache.getActualTableName(tmp);
+ }
+
+ static class TableScanWorkerAssignmentResult {
+ final PinotDataDistribution _pinotDataDistribution;
+ final Map<Integer, Map<String, List<String>>> _workerIdToSegmentsMap;
+
+ TableScanWorkerAssignmentResult(PinotDataDistribution
pinotDataDistribution,
+ Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap) {
+ _pinotDataDistribution = pinotDataDistribution;
+ _workerIdToSegmentsMap = workerIdToSegmentsMap;
+ }
+ }
+
+ static class InstanceIdToSegments {
+ @Nullable
+ Map<String, List<String>> _offlineTableSegmentsMap;
+ @Nullable
+ Map<String, List<String>> _realtimeTableSegmentsMap;
+
+ @Nullable
+ Map<String, List<String>> getSegmentsMap(TableType tableType) {
+ return tableType == TableType.OFFLINE ? _offlineTableSegmentsMap :
_realtimeTableSegmentsMap;
+ }
+
+ Set<String> getActiveTableTypes() {
+ Set<String> tableTypes = new HashSet<>();
+ if (MapUtils.isNotEmpty(_offlineTableSegmentsMap)) {
+ tableTypes.add(TableType.OFFLINE.name());
+ }
+ if (MapUtils.isNotEmpty(_realtimeTableSegmentsMap)) {
+ tableTypes.add(TableType.REALTIME.name());
+ }
+ return tableTypes;
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/mapping/PinotDistMappingTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/mapping/PinotDistMappingTest.java
new file mode 100644
index 0000000000..bd537c4be0
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/mapping/PinotDistMappingTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.mapping;
+
+import java.util.List;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PinotDistMappingTest {
+ @Test
+ public void testIdentityMapping() {
+ // Test identity mapping returns the expected mapping.
+ PinotDistMapping mapping = PinotDistMapping.identity(10);
+ assertEquals(mapping.getSourceCount(), 10);
+ for (int i = 0; i < 10; i++) {
+ assertEquals(mapping.getTarget(i), i);
+ }
+ // Test getMappedKeys always returns the same values as the input.
+ List<List<Integer>> testKeys = List.of(
+ List.of(0),
+ List.of(9),
+ List.of(1, 3, 5),
+ List.of(0, 2, 4),
+ List.of(4, 2, 9)
+ );
+ for (List<Integer> keys : testKeys) {
+ assertEquals(mapping.getMappedKeys(keys), keys);
+ }
+ }
+
+ @Test
+ public void testOutOfBoundsSource() {
+ // When the passed source index is out of bounds wrt the sourceCount in
the mapping, we should get an exception.
+ PinotDistMapping mapping = new PinotDistMapping(5);
+ assertThrows(IllegalArgumentException.class, () -> mapping.getTarget(-1));
+ assertThrows(IllegalArgumentException.class, () -> mapping.getTarget(5));
+ assertThrows(IllegalArgumentException.class, () -> mapping.set(-1, 2));
+ assertThrows(IllegalArgumentException.class, () -> mapping.set(5, 2));
+ assertThrows(IllegalArgumentException.class, () ->
mapping.getMappedKeys(List.of(5)));
+ }
+
+ @Test
+ public void testSet() {
+ // Test setting a mapping value.
+ PinotDistMapping mapping = new PinotDistMapping(5);
+ mapping.set(0, 2);
+ assertEquals(mapping.getTarget(0), 2);
+ assertEquals(mapping.getTarget(1), -1);
+ assertEquals(mapping.getTarget(2), -1);
+ assertEquals(mapping.getTarget(3), -1);
+ assertEquals(mapping.getTarget(4), -1);
+
+ // Test setting multiple mapping values.
+ mapping.set(1, 3);
+ mapping.set(2, 4);
+ assertEquals(mapping.getTarget(0), 2);
+ assertEquals(mapping.getTarget(1), 3);
+ assertEquals(mapping.getTarget(2), 4);
+ assertEquals(mapping.getTarget(3), -1);
+ assertEquals(mapping.getTarget(4), -1);
+
+ // Test setting a mapping value to an invalid index.
+ assertThrows(IllegalArgumentException.class, () -> mapping.set(-1, 2));
+ assertThrows(IllegalArgumentException.class, () -> mapping.set(5, 2));
+ }
+
+ @Test
+ public void testGetMappedKeys() {
+ {
+ // Test when all passed keys are mapped.
+ PinotDistMapping mapping = new PinotDistMapping(5);
+ mapping.set(0, 2);
+ mapping.set(1, 3);
+ mapping.set(2, 4);
+ List<Integer> keys = List.of(0, 1, 2);
+ List<Integer> expectedMappedKeys = List.of(2, 3, 4);
+ assertEquals(mapping.getMappedKeys(keys), expectedMappedKeys);
+ }
+ {
+ // Test when one of the keys is not mapped
+ PinotDistMapping mapping = new PinotDistMapping(5);
+ mapping.set(0, 2);
+ mapping.set(1, 3);
+ List<Integer> keys = List.of(0, 1, 2);
+ List<Integer> expectedMappedKeys = List.of();
+ assertEquals(mapping.getMappedKeys(keys), expectedMappedKeys);
+ }
+ {
+ // Test getting mapped keys with an invalid key.
+ PinotDistMapping mapping = new PinotDistMapping(5);
+ mapping.set(0, 2);
+ mapping.set(1, 3);
+ List<Integer> keys = List.of(0, 1, 5);
+ assertThrows(IllegalArgumentException.class, () ->
mapping.getMappedKeys(keys));
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
new file mode 100644
index 0000000000..eed658a96b
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt.rules;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
+import
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule.InstanceIdToSegments;
+import
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule.TableScanWorkerAssignmentResult;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class LeafStageWorkerAssignmentRuleTest {
+ private static final String TABLE_NAME = "testTable";
+ private static final List<String> FIELDS_IN_SCAN = List.of("userId",
"orderId", "orderAmount", "cityId", "cityName");
+ private static final String PARTITION_COLUMN = "userId";
+ private static final String PARTITION_FUNCTION = "murmur";
+ private static final int NUM_SERVERS = 4;
+ private static final int OFFLINE_NUM_PARTITIONS = 4;
+ private static final int REALTIME_NUM_PARTITIONS = 8;
+ private static final InstanceIdToSegments OFFLINE_INSTANCE_ID_TO_SEGMENTS;
+ private static final InstanceIdToSegments REALTIME_INSTANCE_ID_TO_SEGMENTS;
+ private static final InstanceIdToSegments HYBRID_INSTANCE_ID_TO_SEGMENTS;
+
+ static {
+ Map<String, List<String>> offlineSegmentsMap = createOfflineSegmentsMap();
+ Map<String, List<String>> realtimeSegmentsMap =
createRealtimeSegmentsMap();
+ OFFLINE_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments();
+ OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap =
offlineSegmentsMap;
+ REALTIME_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments();
+ REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap =
realtimeSegmentsMap;
+ HYBRID_INSTANCE_ID_TO_SEGMENTS = new InstanceIdToSegments();
+ HYBRID_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap =
offlineSegmentsMap;
+ HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap =
realtimeSegmentsMap;
+ }
+
+ @Test
+ public void testAssignTableScanWithUnPartitionedOfflineTable() {
+ TableScanWorkerAssignmentResult result =
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+ OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of());
+ assertEquals(result._pinotDataDistribution.getType(),
RelDistribution.Type.RANDOM_DISTRIBUTED);
+ assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+ assertEquals(result._pinotDataDistribution.getCollation(),
RelCollations.EMPTY);
+
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 0);
+ validateTableScanAssignment(result,
OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap, "OFFLINE");
+ }
+
+ @Test
+ public void testAssignTableScanWithUnPartitionedRealtimeTable() {
+ TableScanWorkerAssignmentResult result =
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+ REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of());
+ assertEquals(result._pinotDataDistribution.getType(),
RelDistribution.Type.RANDOM_DISTRIBUTED);
+ assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+ assertEquals(result._pinotDataDistribution.getCollation(),
RelCollations.EMPTY);
+
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 0);
+ validateTableScanAssignment(result,
REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
+ }
+
+ @Test
+ public void testAssignTableScanWithUnPartitionedHybridTable() {
+ TableScanWorkerAssignmentResult result =
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+ HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of());
+ assertEquals(result._pinotDataDistribution.getType(),
RelDistribution.Type.RANDOM_DISTRIBUTED);
+ assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+ assertEquals(result._pinotDataDistribution.getCollation(),
RelCollations.EMPTY);
+
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 0);
+ validateTableScanAssignment(result,
HYBRID_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap, "OFFLINE");
+ validateTableScanAssignment(result,
HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
+ }
+
+ @Test
+ public void testAssignTableScanPartitionedOfflineTable() {
+ TableScanWorkerAssignmentResult result =
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+ OFFLINE_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE",
createOfflineTablePartitionInfo()));
+ // Basic checks.
+ assertEquals(result._pinotDataDistribution.getType(),
RelDistribution.Type.HASH_DISTRIBUTED);
+ assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+ assertEquals(result._pinotDataDistribution.getCollation(),
RelCollations.EMPTY);
+
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 1);
+ HashDistributionDesc desc =
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
+ assertEquals(desc.getNumPartitions(), OFFLINE_NUM_PARTITIONS);
+ assertEquals(desc.getKeys(),
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
+ assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+ validateTableScanAssignment(result,
OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap, "OFFLINE");
+ }
+
+ @Test
+ public void testAssignTableScanPartitionedRealtimeTable() {
+ TableScanWorkerAssignmentResult result =
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+ REALTIME_INSTANCE_ID_TO_SEGMENTS, Map.of("REALTIME",
createRealtimeTablePartitionInfo()));
+ // Basic checks.
+ assertEquals(result._pinotDataDistribution.getType(),
RelDistribution.Type.HASH_DISTRIBUTED);
+ assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+ assertEquals(result._pinotDataDistribution.getCollation(),
RelCollations.EMPTY);
+
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 1);
+ HashDistributionDesc desc =
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
+ assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS);
+ assertEquals(desc.getKeys(),
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
+ assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+ validateTableScanAssignment(result,
REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
+ }
+
+ @Test
+ public void testAssignTableScanPartitionedHybridTable() {
+ TableScanWorkerAssignmentResult result =
LeafStageWorkerAssignmentRule.assignTableScan(TABLE_NAME, FIELDS_IN_SCAN,
+ HYBRID_INSTANCE_ID_TO_SEGMENTS, Map.of("OFFLINE",
createOfflineTablePartitionInfo(),
+ "REALTIME", createRealtimeTablePartitionInfo()));
+ assertEquals(result._pinotDataDistribution.getType(),
RelDistribution.Type.RANDOM_DISTRIBUTED);
+ assertEquals(result._pinotDataDistribution.getWorkers().size(), 4);
+ assertEquals(result._pinotDataDistribution.getCollation(),
RelCollations.EMPTY);
+
assertEquals(result._pinotDataDistribution.getHashDistributionDesc().size(), 0);
+ validateTableScanAssignment(result,
HYBRID_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap, "OFFLINE");
+ validateTableScanAssignment(result,
HYBRID_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
+ }
+
+ private static void
validateTableScanAssignment(TableScanWorkerAssignmentResult assignmentResult,
+ Map<String, List<String>> instanceIdToSegmentsMap, String tableType) {
+ Map<String, List<String>> actualInstanceIdToSegments = new HashMap<>();
+ for (var entry : assignmentResult._workerIdToSegmentsMap.entrySet()) {
+ int workerId = entry.getKey();
+ String fullWorkerId =
assignmentResult._pinotDataDistribution.getWorkers().get(workerId);
+ String instanceId = fullWorkerId.split("@")[1];
+ actualInstanceIdToSegments.put(instanceId,
entry.getValue().get(tableType));
+ assertEquals(Integer.parseInt(fullWorkerId.split("@")[0]), workerId);
+ }
+ assertEquals(actualInstanceIdToSegments, instanceIdToSegmentsMap);
+ }
+
+ private static Map<String, List<String>> createOfflineSegmentsMap() {
+ // assume 4 servers and 4 partitions.
+ Map<String, List<String>> result = new HashMap<>();
+ result.put("instance-0", List.of("segment1-0", "segment2-0",
"segment3-0"));
+ result.put("instance-1", List.of("segment1-1", "segment2-1"));
+ result.put("instance-2", List.of("segment1-2"));
+ result.put("instance-3", List.of("segment1-3", "segment2-3",
"segment3-3"));
+ return result;
+ }
+
+ private static Map<String, List<String>> createRealtimeSegmentsMap() {
+ // assume 4 servers and 8 partitions. assume partition-5 is missing.
+ Map<String, List<String>> result = new HashMap<>();
+ result.put("instance-0", List.of("segment1-0", "segment1-4",
"segment2-4"));
+ result.put("instance-1", List.of("segment1-1", "segment2-1"));
+ result.put("instance-2", List.of("segment1-2", "segment1-6"));
+ result.put("instance-3", List.of("segment1-3", "segment2-3", "segment1-7",
"segment2-7"));
+ return result;
+ }
+
+ private static TablePartitionInfo createOfflineTablePartitionInfo() {
+ TablePartitionInfo.PartitionInfo[] infos = new
TablePartitionInfo.PartitionInfo[OFFLINE_NUM_PARTITIONS];
+ for (int partitionNum = 0; partitionNum < OFFLINE_NUM_PARTITIONS;
partitionNum++) {
+ String selectedInstance = String.format("instance-%s", partitionNum %
NUM_SERVERS);
+ String additionalInstance = String.format("instance-%s", NUM_SERVERS +
partitionNum);
+ final String segmentSuffixForPartition = String.format("-%d",
partitionNum);
+ List<String> segments =
Objects.requireNonNull(OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap)
+ .get(selectedInstance).stream().filter(segment ->
segment.endsWith(segmentSuffixForPartition))
+ .collect(Collectors.toList());
+ infos[partitionNum] = new
TablePartitionInfo.PartitionInfo(Set.of(selectedInstance, additionalInstance),
+ segments);
+ }
+ return new
TablePartitionInfo(TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TABLE_NAME),
+ PARTITION_COLUMN, PARTITION_FUNCTION, OFFLINE_NUM_PARTITIONS, infos,
List.of());
+ }
+
+ private static TablePartitionInfo createRealtimeTablePartitionInfo() {
+ TablePartitionInfo.PartitionInfo[] infos = new
TablePartitionInfo.PartitionInfo[REALTIME_NUM_PARTITIONS];
+ for (int partitionNum = 0; partitionNum < REALTIME_NUM_PARTITIONS;
partitionNum++) {
+ String selectedInstance = String.format("instance-%s", partitionNum %
NUM_SERVERS);
+ String additionalInstance = String.format("instance-%s", NUM_SERVERS +
(partitionNum % NUM_SERVERS));
+ final String segmentSuffixForPartition = String.format("-%d",
partitionNum);
+ List<String> segments =
Objects.requireNonNull(REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap)
+ .get(selectedInstance).stream().filter(segment ->
segment.endsWith(segmentSuffixForPartition))
+ .collect(Collectors.toList());
+ infos[partitionNum] = new
TablePartitionInfo.PartitionInfo(Set.of(selectedInstance, additionalInstance),
+ segments);
+ }
+ return new
TablePartitionInfo(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(TABLE_NAME),
+ PARTITION_COLUMN, PARTITION_FUNCTION, REALTIME_NUM_PARTITIONS, infos,
List.of());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]