This is an automated email from the ASF dual-hosted git repository.
chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5b151f77356 [multistage] Fix Bug in Physical Optimizer for SetOp
Heterogenous Dists (#16627)
5b151f77356 is described below
commit 5b151f77356bb0bbb2e345bdcf0fa6ca817e853d
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Aug 19 21:38:35 2025 -0500
[multistage] Fix Bug in Physical Optimizer for SetOp Heterogenous Dists
(#16627)
* set op fixes in physical opt
* [multistage] Fix Bug in Physical Optimizer for SetOp Heterogenous Dists
* minor precondition
---
.../v2/opt/rules/WorkerExchangeAssignmentRule.java | 76 +++++++++++++++++++---
.../src/test/resources/queries/SetOpsH2.json | 12 ++--
.../src/test/resources/queries/SetOpsNonH2.json | 9 +--
3 files changed, 75 insertions(+), 22 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 15cd43c028f..605206a0e31 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -93,22 +93,25 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
return currentNode;
}
if (currentNode.getPRelInputs().isEmpty()) {
- return processCurrentNode(currentNode, parent);
+ return processCurrentNode(currentNode, parent, true);
}
if (currentNode.getPRelInputs().size() == 1) {
List<PRelNode> newInputs =
List.of(executeInternal(currentNode.getPRelInput(0), currentNode));
currentNode = currentNode.with(newInputs);
- return processCurrentNode(currentNode, parent);
+ return processCurrentNode(currentNode, parent, true);
}
// Process first input.
List<PRelNode> newInputs = new ArrayList<>();
newInputs.add(executeInternal(currentNode.getPRelInput(0), currentNode));
newInputs.addAll(currentNode.getPRelInputs().subList(1,
currentNode.getPRelInputs().size()));
currentNode = currentNode.with(newInputs);
- // Process current node.
- currentNode = processCurrentNode(currentNode, parent);
+ // Process current node. For SetOp, we don't meet dist/collation traits
immediately. This is because all inputs
+ // of a SetOp need to be processed before we can infer any Dist trait
(specifically Hash).
+ boolean meetConstraints = !(currentNode.unwrap() instanceof SetOp);
+ currentNode = processCurrentNode(currentNode, parent, meetConstraints);
// Process remaining inputs.
if (currentNode instanceof PhysicalExchange) {
+ Preconditions.checkState(meetConstraints, "PhysicalExchange should not
be created constraints were skipped");
PhysicalExchange exchange = (PhysicalExchange) currentNode;
currentNode = exchange.getPRelInput(0);
for (int index = 1; index < currentNode.getPRelInputs().size(); index++)
{
@@ -123,16 +126,24 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
}
currentNode = currentNode.with(newInputs);
currentNode = inheritDistDescFromInputs(currentNode);
+ if (!meetConstraints) {
+ currentNode = meetCurrentNodeConstraints(currentNode, parent);
+ }
return currentNode;
}
- PRelNode processCurrentNode(PRelNode currentNode, @Nullable PRelNode
parentNode) {
- // Step-1: Initialize variables.
- boolean isLeafStageBoundary = isLeafStageBoundary(currentNode, parentNode);
- // Step-2: Get current node's distribution. If the current node already
has a distribution attached, use that.
- // Otherwise, compute it using DistMappingGenerator.
+ PRelNode processCurrentNode(PRelNode currentNode, @Nullable PRelNode
parentNode, boolean meetConstraints) {
PinotDataDistribution currentNodeDistribution =
computeCurrentNodeDistribution(currentNode, parentNode);
currentNode = currentNode.with(currentNode.getPRelInputs(),
currentNodeDistribution);
+ if (meetConstraints) {
+ return meetCurrentNodeConstraints(currentNode, parentNode);
+ }
+ return currentNode.with(currentNode.getPRelInputs(),
currentNodeDistribution);
+ }
+
+ PRelNode meetCurrentNodeConstraints(PRelNode currentNode, @Nullable PRelNode
parentNode) {
+ boolean isLeafStageBoundary = isLeafStageBoundary(currentNode, parentNode);
+ PinotDataDistribution currentNodeDistribution =
currentNode.getPinotDataDistributionOrThrow();
// Step-3: Add an optional exchange to meet unmet distribution trait
constraint, if it exists. This also takes care
// of different workers when the parent already has workers
assigned to it (when parent is not a SingleRel).
PRelNode currentNodeExchange = meetDistributionConstraint(currentNode,
currentNodeDistribution, parentNode);
@@ -156,12 +167,27 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
return currentNode.with(currentNode.getPRelInputs(),
currentNodeDistribution);
}
+ /**
+ * For Hash Distributed nodes, for nodes like Join we can inherit multiple
distribution traits from inputs.
+ * For SetOp nodes, we may have to drop some Distribution traits that we
initially inferred from the left-most
+ * input. This is because SetOp nodes all fold into the same schema, and if
any of the inputs is not distributed
+ * by a column, the entire SetOp is not distributed by that column.
+ */
PRelNode inheritDistDescFromInputs(PRelNode currentNode) {
- // Inherit distribution trait from inputs (except left-most input, which
is already inherited).
if (currentNode.getPRelInputs().size() <= 1
|| currentNode.getPinotDataDistributionOrThrow().getType() !=
RelDistribution.Type.HASH_DISTRIBUTED) {
return currentNode;
}
+ if (currentNode instanceof Join) {
+ return inheritDistDescFromInputsForJoin(currentNode);
+ } else if (currentNode instanceof SetOp) {
+ return inheritDistDescFromInputsForSetOp(currentNode);
+ }
+ return currentNode;
+ }
+
+ PRelNode inheritDistDescFromInputsForJoin(PRelNode currentNode) {
+ // Inherit distribution trait from inputs (except left-most input, which
is already inherited).
PinotDataDistribution currentDistribution =
currentNode.getPinotDataDistributionOrThrow();
Set<HashDistributionDesc> newDistributionSet =
new
HashSet<>(currentNode.getPinotDataDistributionOrThrow().getHashDistributionDesc());
@@ -183,6 +209,36 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
return currentNode.with(currentNode.getPRelInputs(), finalDist);
}
+ PRelNode inheritDistDescFromInputsForSetOp(PRelNode currentNode) {
+ Preconditions.checkState(currentNode instanceof SetOp, "Expected SetOp.
Found: %s", currentNode);
+ if (currentNode.getPinotDataDistributionOrThrow().getType() !=
RelDistribution.Type.HASH_DISTRIBUTED) {
+ return currentNode;
+ }
+ Set<HashDistributionDesc> currentDescs =
currentNode.getPinotDataDistributionOrThrow().getHashDistributionDesc();
+ // if any of these descriptors doesn't exist in any of inputs[1:n], then
drop them.
+ for (PRelNode input : currentNode.getPRelInputs().subList(1,
currentNode.getPRelInputs().size())) {
+ if (input.getPinotDataDistributionOrThrow().getType() !=
RelDistribution.Type.HASH_DISTRIBUTED) {
+ return currentNode.with(currentNode.getPRelInputs(), new
PinotDataDistribution(
+ RelDistribution.Type.RANDOM_DISTRIBUTED,
currentNode.getPinotDataDistributionOrThrow().getWorkers(),
+ currentNode.getPinotDataDistributionOrThrow().getWorkerHash(),
null, null));
+ }
+ Set<HashDistributionDesc> inputDescs =
input.getPinotDataDistributionOrThrow().getHashDistributionDesc();
+ currentDescs.removeIf(desc -> !inputDescs.contains(desc));
+ if (currentDescs.isEmpty()) {
+ break;
+ }
+ }
+ if (currentDescs.isEmpty()) {
+ return currentNode.with(currentNode.getPRelInputs(), new
PinotDataDistribution(
+ RelDistribution.Type.RANDOM_DISTRIBUTED,
currentNode.getPinotDataDistributionOrThrow().getWorkers(),
+ currentNode.getPinotDataDistributionOrThrow().getWorkerHash(), null,
null));
+ }
+ return currentNode.with(currentNode.getPRelInputs(), new
PinotDataDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED,
currentNode.getPinotDataDistributionOrThrow().getWorkers(),
+ currentNode.getPinotDataDistributionOrThrow().getWorkerHash(),
currentDescs,
+ currentNode.getPinotDataDistributionOrThrow().getCollation()));
+ }
+
@Nullable
@VisibleForTesting
PRelNode meetDistributionConstraint(PRelNode currentNode,
PinotDataDistribution derivedDistribution,
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
index 8b09a61a94b..bb32dbc5cff 100644
--- a/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
@@ -46,9 +46,9 @@
{ "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}"},
{ "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}"},
{ "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM
{tbl2}"},
- { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
- { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
- { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION (SELECT intCol FROM {tbl1} WHERE intCol > 2)", "ignoreV2Optimizer":
true},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION (SELECT intCol FROM {tbl1} WHERE intCol > 2)"},
{ "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
@@ -62,12 +62,12 @@
{ "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION ALL SELECT intCol FROM {tbl2} UNION ALL SELECT intCol FROM
{tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
- { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}"},
{ "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} EXCEPT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
- { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}"},
{ "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} INTERSECT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}"},
{ "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
@@ -78,7 +78,7 @@
{ "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE
strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM
{tbl1} WHERE strCol = 'baby' "},
{ "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
{ "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT
intArrayCol, strArrayCol FROM {tbl3}"},
- { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}"},
{ "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"}
]
}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
index 70f79836e8e..2a0e8de5697 100644
--- a/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
@@ -66,8 +66,7 @@
[2],
[3],
[4]
- ],
- "ignoreV2Optimizer": true
+ ]
},
{
"description": "INTERSECT ALL with UNION",
@@ -77,8 +76,7 @@
[2],
[3],
[4]
- ],
- "ignoreV2Optimizer": true
+ ]
},
{
"description": "INTERSECT ALL with UNION ALL",
@@ -117,8 +115,7 @@
[4],
[1],
[2]
- ],
- "ignoreV2Optimizer": true
+ ]
},
{
"description": "EXCEPT ALL with UNION",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]