This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3c6d5b11b7c [multistage] Push Collation from Exchange to Lite Mode
Sort (#16551)
3c6d5b11b7c is described below
commit 3c6d5b11b7cf8aefb0e6251bb672b3325148e311
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Aug 11 15:05:11 2025 -0500
[multistage] Push Collation from Exchange to Lite Mode Sort (#16551)
---
.../physical/v2/opt/rules/LiteModeSortInsertRule.java | 15 ++++++++++++++-
.../test/resources/queries/PhysicalOptimizerPlans.json | 15 +++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
index b4d102a45dd..f356abcbb67 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.physical.v2.opt.rules;
import com.google.common.base.Preconditions;
import java.util.List;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
@@ -29,6 +30,7 @@ import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.logical.RexExpressionUtils;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall;
@@ -87,9 +89,20 @@ public class LiteModeSortInsertRule extends PRelOptRule {
int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() :
serverStageLimit;
return aggregate.withLimit(limit);
}
+ RelCollation relCollation = RelCollations.EMPTY;
+ if (!call._parents.isEmpty()) {
+ // Pass collation from the Exchange above if it exists.
+ PRelNode parent = call._parents.getLast();
+ if (parent.unwrap() instanceof PhysicalExchange) {
+ PhysicalExchange physicalExchange = (PhysicalExchange) parent.unwrap();
+ if (physicalExchange.getRelCollation() != null) {
+ relCollation = physicalExchange.getRelCollation();
+ }
+ }
+ }
PRelNode input = call._currentNode;
return new PhysicalSort(input.unwrap().getCluster(),
RelTraitSet.createEmpty(), List.of(),
- RelCollations.EMPTY, null /* offset */, newFetch, input, nodeId(),
input.getPinotDataDistributionOrThrow(),
+ relCollation, null /* offset */, newFetch, input, nodeId(),
input.getPinotDataDistributionOrThrow(),
true);
}
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index 106d0e7f9a9..e716a543bb0 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -805,6 +805,21 @@
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
+ },
+ {
+ "description": "Tests collation push down for Lite Mode. Collation
from the exchange above should get pushed down to the sort below.",
+ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN
PLAN FOR WITH tmp AS (SELECT col1, ROW_NUMBER() OVER (ORDER BY ts DESC) as
row_num FROM a) SELECT COUNT(*) FROM tmp WHERE row_num <= 100",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalAggregate(group=[{}], agg#0=[COUNT()], aggType=[DIRECT])",
+ "\n PhysicalFilter(condition=[<=($1, 100)])",
+ "\n PhysicalWindow(window#0=[window(order by [0 DESC] rows
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
collation=[[0 DESC]])",
+ "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[100000])",
+ "\n PhysicalProject(ts=[$7])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
}
]
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]