This is an automated email from the ASF dual-hosted git repository.

chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b151f77356 [multistage] Fix Bug in Physical Optimizer for SetOp 
Heterogenous Dists (#16627)
5b151f77356 is described below

commit 5b151f77356bb0bbb2e345bdcf0fa6ca817e853d
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Aug 19 21:38:35 2025 -0500

    [multistage] Fix Bug in Physical Optimizer for SetOp Heterogenous Dists 
(#16627)
    
    * set op fixes in physical opt
    
    * [multistage] Fix Bug in Physical Optimizer for SetOp Heterogenous Dists
    
    * minor precondition
---
 .../v2/opt/rules/WorkerExchangeAssignmentRule.java | 76 +++++++++++++++++++---
 .../src/test/resources/queries/SetOpsH2.json       | 12 ++--
 .../src/test/resources/queries/SetOpsNonH2.json    |  9 +--
 3 files changed, 75 insertions(+), 22 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 15cd43c028f..605206a0e31 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -93,22 +93,25 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
       return currentNode;
     }
     if (currentNode.getPRelInputs().isEmpty()) {
-      return processCurrentNode(currentNode, parent);
+      return processCurrentNode(currentNode, parent, true);
     }
     if (currentNode.getPRelInputs().size() == 1) {
       List<PRelNode> newInputs = 
List.of(executeInternal(currentNode.getPRelInput(0), currentNode));
       currentNode = currentNode.with(newInputs);
-      return processCurrentNode(currentNode, parent);
+      return processCurrentNode(currentNode, parent, true);
     }
     // Process first input.
     List<PRelNode> newInputs = new ArrayList<>();
     newInputs.add(executeInternal(currentNode.getPRelInput(0), currentNode));
     newInputs.addAll(currentNode.getPRelInputs().subList(1, 
currentNode.getPRelInputs().size()));
     currentNode = currentNode.with(newInputs);
-    // Process current node.
-    currentNode = processCurrentNode(currentNode, parent);
+    // Process current node. For SetOp, we don't meet dist/collation traits 
immediately. This is because all inputs
+    // of a SetOp need to be processed before we can infer any Dist trait 
(specifically Hash).
+    boolean meetConstraints = !(currentNode.unwrap() instanceof SetOp);
+    currentNode = processCurrentNode(currentNode, parent, meetConstraints);
     // Process remaining inputs.
     if (currentNode instanceof PhysicalExchange) {
+      Preconditions.checkState(meetConstraints, "PhysicalExchange should not 
be created constraints were skipped");
       PhysicalExchange exchange = (PhysicalExchange) currentNode;
       currentNode = exchange.getPRelInput(0);
       for (int index = 1; index < currentNode.getPRelInputs().size(); index++) 
{
@@ -123,16 +126,24 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
     }
     currentNode = currentNode.with(newInputs);
     currentNode = inheritDistDescFromInputs(currentNode);
+    if (!meetConstraints) {
+      currentNode = meetCurrentNodeConstraints(currentNode, parent);
+    }
     return currentNode;
   }
 
-  PRelNode processCurrentNode(PRelNode currentNode, @Nullable PRelNode 
parentNode) {
-    // Step-1: Initialize variables.
-    boolean isLeafStageBoundary = isLeafStageBoundary(currentNode, parentNode);
-    // Step-2: Get current node's distribution. If the current node already 
has a distribution attached, use that.
-    //         Otherwise, compute it using DistMappingGenerator.
+  PRelNode processCurrentNode(PRelNode currentNode, @Nullable PRelNode 
parentNode, boolean meetConstraints) {
     PinotDataDistribution currentNodeDistribution = 
computeCurrentNodeDistribution(currentNode, parentNode);
     currentNode = currentNode.with(currentNode.getPRelInputs(), 
currentNodeDistribution);
+    if (meetConstraints) {
+      return meetCurrentNodeConstraints(currentNode, parentNode);
+    }
+    return currentNode.with(currentNode.getPRelInputs(), 
currentNodeDistribution);
+  }
+
+  PRelNode meetCurrentNodeConstraints(PRelNode currentNode, @Nullable PRelNode 
parentNode) {
+    boolean isLeafStageBoundary = isLeafStageBoundary(currentNode, parentNode);
+    PinotDataDistribution currentNodeDistribution = 
currentNode.getPinotDataDistributionOrThrow();
     // Step-3: Add an optional exchange to meet unmet distribution trait 
constraint, if it exists. This also takes care
     //         of different workers when the parent already has workers 
assigned to it (when parent is not a SingleRel).
     PRelNode currentNodeExchange = meetDistributionConstraint(currentNode, 
currentNodeDistribution, parentNode);
@@ -156,12 +167,27 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
     return currentNode.with(currentNode.getPRelInputs(), 
currentNodeDistribution);
   }
 
+  /**
+   * For Hash Distributed nodes, for nodes like Join we can inherit multiple 
distribution traits from inputs.
+   * For SetOp nodes, we may have to drop some Distribution traits that we 
initially inferred from the left-most
+   * input. This is because SetOp nodes all fold into the same schema, and if 
any of the inputs is not distributed
+   * by a column, the entire SetOp is not distributed by that column.
+   */
   PRelNode inheritDistDescFromInputs(PRelNode currentNode) {
-    // Inherit distribution trait from inputs (except left-most input, which 
is already inherited).
     if (currentNode.getPRelInputs().size() <= 1
         || currentNode.getPinotDataDistributionOrThrow().getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
       return currentNode;
     }
+    if (currentNode instanceof Join) {
+      return inheritDistDescFromInputsForJoin(currentNode);
+    } else if (currentNode instanceof SetOp) {
+      return inheritDistDescFromInputsForSetOp(currentNode);
+    }
+    return currentNode;
+  }
+
+  PRelNode inheritDistDescFromInputsForJoin(PRelNode currentNode) {
+    // Inherit distribution trait from inputs (except left-most input, which 
is already inherited).
     PinotDataDistribution currentDistribution = 
currentNode.getPinotDataDistributionOrThrow();
     Set<HashDistributionDesc> newDistributionSet =
         new 
HashSet<>(currentNode.getPinotDataDistributionOrThrow().getHashDistributionDesc());
@@ -183,6 +209,36 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
     return currentNode.with(currentNode.getPRelInputs(), finalDist);
   }
 
+  PRelNode inheritDistDescFromInputsForSetOp(PRelNode currentNode) {
+    Preconditions.checkState(currentNode instanceof SetOp, "Expected SetOp. 
Found: %s", currentNode);
+    if (currentNode.getPinotDataDistributionOrThrow().getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+      return currentNode;
+    }
+    Set<HashDistributionDesc> currentDescs = 
currentNode.getPinotDataDistributionOrThrow().getHashDistributionDesc();
+    // if any of these descriptors doesn't exist in any of inputs[1:n], then 
drop them.
+    for (PRelNode input : currentNode.getPRelInputs().subList(1, 
currentNode.getPRelInputs().size())) {
+      if (input.getPinotDataDistributionOrThrow().getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+        return currentNode.with(currentNode.getPRelInputs(), new 
PinotDataDistribution(
+            RelDistribution.Type.RANDOM_DISTRIBUTED, 
currentNode.getPinotDataDistributionOrThrow().getWorkers(),
+            currentNode.getPinotDataDistributionOrThrow().getWorkerHash(), 
null, null));
+      }
+      Set<HashDistributionDesc> inputDescs = 
input.getPinotDataDistributionOrThrow().getHashDistributionDesc();
+      currentDescs.removeIf(desc -> !inputDescs.contains(desc));
+      if (currentDescs.isEmpty()) {
+        break;
+      }
+    }
+    if (currentDescs.isEmpty()) {
+      return currentNode.with(currentNode.getPRelInputs(), new 
PinotDataDistribution(
+          RelDistribution.Type.RANDOM_DISTRIBUTED, 
currentNode.getPinotDataDistributionOrThrow().getWorkers(),
+          currentNode.getPinotDataDistributionOrThrow().getWorkerHash(), null, 
null));
+    }
+    return currentNode.with(currentNode.getPRelInputs(), new 
PinotDataDistribution(
+        RelDistribution.Type.HASH_DISTRIBUTED, 
currentNode.getPinotDataDistributionOrThrow().getWorkers(),
+        currentNode.getPinotDataDistributionOrThrow().getWorkerHash(), 
currentDescs,
+        currentNode.getPinotDataDistributionOrThrow().getCollation()));
+  }
+
   @Nullable
   @VisibleForTesting
   PRelNode meetDistributionConstraint(PRelNode currentNode, 
PinotDataDistribution derivedDistribution,
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json 
b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
index 8b09a61a94b..bb32dbc5cff 100644
--- a/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
@@ -46,9 +46,9 @@
       { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}"},
       { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}"},
       { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM 
{tbl2}"},
-      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
-      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
-      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION (SELECT intCol FROM {tbl1} WHERE intCol > 2)", "ignoreV2Optimizer":  
true},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION (SELECT intCol FROM {tbl1} WHERE intCol > 2)"},
       { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
@@ -62,12 +62,12 @@
       { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION ALL SELECT intCol FROM {tbl2} UNION ALL SELECT intCol FROM 
{tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
-      { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+      { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}"},
       { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} EXCEPT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
-      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}"},
       { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} INTERSECT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}"},
       { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
@@ -78,7 +78,7 @@
       { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE 
strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM 
{tbl1} WHERE strCol = 'baby' "},
       { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
       { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT 
intArrayCol, strArrayCol FROM {tbl3}"},
-      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}"},
       { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"}
     ]
   }
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json 
b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
index 70f79836e8e..2a0e8de5697 100644
--- a/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
@@ -66,8 +66,7 @@
           [2],
           [3],
           [4]
-        ],
-        "ignoreV2Optimizer":  true
+        ]
       },
       {
         "description": "INTERSECT ALL with UNION",
@@ -77,8 +76,7 @@
           [2],
           [3],
           [4]
-        ],
-        "ignoreV2Optimizer":  true
+        ]
       },
       {
         "description": "INTERSECT ALL with UNION ALL",
@@ -117,8 +115,7 @@
           [4],
           [1],
           [2]
-        ],
-        "ignoreV2Optimizer":  true
+        ]
       },
       {
         "description": "EXCEPT ALL with UNION",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to