This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4c6e6f25807 Add 6 Calcite optimization rules to the multi-stage query
engine (5 default-on, 1 opt-in) (#18554)
4c6e6f25807 is described below
commit 4c6e6f25807850691469d5ddbcf8d75781d3d407
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 11:24:09 2026 -0700
Add 6 Calcite optimization rules to the multi-stage query engine (5
default-on, 1 opt-in) (#18554)
---
.../tests/OfflineClusterIntegrationTest.java | 3 +
.../calcite/rel/rules/PinotQueryRuleSets.java | 31 ++++++
.../pinot/query/QueryPlannerRuleOptionsTest.java | 124 +++++++++++++++++++++
.../src/test/resources/queries/GroupByPlans.json | 16 +--
.../resources/queries/PhysicalOptimizerPlans.json | 36 +++---
.../test/resources/queries/PinotHintablePlans.json | 8 +-
.../src/test/resources/queries/SetOpPlans.json | 12 +-
.../apache/pinot/spi/utils/CommonConstants.java | 14 ++-
8 files changed, 208 insertions(+), 36 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b5312fa72e3..be6f1c1931a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3594,6 +3594,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(response1Json.get("rows").get(0).get(2).asText(), "Rule
Execution Times\n"
+ "Rule: SortRemove -> Time:*\n"
+ "Rule: AggregateProjectMerge -> Time:*\n"
+ + "Rule: AggregateProjectPullUpConstants -> Time:*\n"
+ + "Rule: ProjectAggregateMerge -> Time:*\n"
+ + "Rule: SortRemoveConstantKeys -> Time:*\n"
+ "Rule: EvaluateProjectLiteral -> Time:*\n"
+ "Rule: AggregateRemove -> Time:*\n");
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index 19909597302..b0c22f672df 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -23,6 +23,7 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.rules.AggregateCaseToFilterRule;
import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
+import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.AggregateUnionAggregateRule;
import org.apache.calcite.rel.rules.CoreRules;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.rules.FilterMergeRule;
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
+import org.apache.calcite.rel.rules.ProjectAggregateMergeRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
@@ -41,7 +43,11 @@ import org.apache.calcite.rel.rules.PruneEmptyRules;
import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortJoinCopyRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
+import org.apache.calcite.rel.rules.SortMergeRule;
+import org.apache.calcite.rel.rules.SortProjectTransposeRule;
+import org.apache.calcite.rel.rules.SortRemoveConstantKeysRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.UnionMergeRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import
org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotFilterIntoJoinRule;
import
org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotJoinConditionPushRule;
@@ -114,6 +120,11 @@ public class PinotQueryRuleSets {
SortJoinCopyRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_JOIN_COPY).toRule(),
+ // Push Sort below Project so LIMIT applies before projection
expressions are evaluated.
+ // Default-on; sits with other transpose rules.
+ SortProjectTransposeRule.Config.DEFAULT
+ .withDescription(PlannerRuleNames.SORT_PROJECT_TRANSPOSE).toRule(),
+
// join rules
JoinPushExpressionsRule.Config.DEFAULT
.withDescription(PlannerRuleNames.JOIN_PUSH_EXPRESSIONS).toRule(),
@@ -197,8 +208,28 @@ public class PinotQueryRuleSets {
.withDescription(PlannerRuleNames.FILTER_MERGE).toRule(),
AggregateRemoveRule.Config.DEFAULT
.withDescription(PlannerRuleNames.AGGREGATE_REMOVE).toRule(),
+ // Drop constant columns from GROUP BY keys when the Aggregate's input
can prove constancy
+ // (typically from an equality filter on the column). Reduces shuffle
key width on
+ // multi-tenant queries like `WHERE tenant_id = 'X' GROUP BY tenant_id,
...`. Default-on.
+ // Use Config.ANY (matches any RelNode below the Aggregate).
Config.DEFAULT requires a
+ // LogicalProject directly below the Aggregate, which never appears in
Pinot's pipeline
+ // because filter pushdown consumes the Project before PRUNE_RULES runs.
+ AggregateProjectPullUpConstantsRule.Config.ANY
+
.withDescription(PlannerRuleNames.AGGREGATE_PROJECT_PULL_UP_CONSTANTS).toRule(),
SortRemoveRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_REMOVE).toRule(),
+ // Collapse stacked Sort/LIMIT nodes (e.g. from sub-query flattening)
into a single Sort. Default-on.
+ SortMergeRule.Config.LIMIT_MERGE
+ .withDescription(PlannerRuleNames.LIMIT_MERGE).toRule(),
+ // Drop constant columns from ORDER BY (e.g. WHERE x='Y' ORDER BY x, ts
→ ORDER BY ts). Default-on.
+ SortRemoveConstantKeysRule.Config.DEFAULT
+
.withDescription(PlannerRuleNames.SORT_REMOVE_CONSTANT_KEYS).toRule(),
+ // Flatten nested UNION ALLs into a single n-ary union (eliminates
intermediate exchange stages). Default-on.
+ UnionMergeRule.Config.DEFAULT
+ .withDescription(PlannerRuleNames.UNION_MERGE).toRule(),
+ // Drop unused aggregate calls when a Project on top of the Aggregate
doesn't reference them. Default-on.
+ ProjectAggregateMergeRule.Config.DEFAULT
+ .withDescription(PlannerRuleNames.PROJECT_AGGREGATE_MERGE).toRule(),
PruneEmptyRules.CorrelateLeftEmptyRuleConfig.DEFAULT
.withDescription(PlannerRuleNames.PRUNE_EMPTY_CORRELATE_LEFT).toRule(),
PruneEmptyRules.CorrelateRightEmptyRuleConfig.DEFAULT
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
index 619aadcd2ba..432f1ee3d00 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
@@ -478,6 +478,130 @@ public class QueryPlannerRuleOptionsTest extends
QueryEnvironmentTestBase {
//@formatter:on
}
+ //
---------------------------------------------------------------------------
+ // Tests for Calcite optimization rules added to the planner in this change.
The 5 default-on
+ // rules each have a paired enabled-by-default / disabled assertion to keep
the contract
+ // explicit; the SortProjectTranspose opt-in rule has the same pair so a
future flip to
+ // default-on is caught. ResourceBasedQueryPlansTest covers the broader
plan-shape surface.
+ //
---------------------------------------------------------------------------
+
+ @Test
+ public void testAggregateProjectPullUpConstantsEnabledByDefault() {
+ // Default-on. `WHERE col1='US' GROUP BY col1, col2` drops col1 from the
group key and
+ // re-introduces it as a projected literal — shuffle key shrinks from
(col1, col2) to (col2).
+ String query = "EXPLAIN PLAN FOR SELECT col1, col2, COUNT(*) FROM a WHERE
col1 = 'US' GROUP BY col1, col2";
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ assertFalse(explain.contains("group=[{0, 1}]"),
+ "AggregateProjectPullUpConstants should remove col1 from group keys.
Plan:\n" + explain);
+ assertTrue(explain.contains("col1=[_UTF-8'US'"),
+ "AggregateProjectPullUpConstants should re-project col1 as the literal
'US'. Plan:\n" + explain);
+ }
+
+ @Test
+ public void testDisableAggregateProjectPullUpConstants() {
+ // Disabling the rule must leave the full (col1, col2) group key intact.
+ String query = "EXPLAIN PLAN FOR SELECT col1, col2, COUNT(*) FROM a WHERE
col1 = 'US' GROUP BY col1, col2";
+ String explain = explainQueryWithRuleDisabled(query,
PlannerRuleNames.AGGREGATE_PROJECT_PULL_UP_CONSTANTS);
+ assertTrue(explain.contains("group=[{0, 1}]"),
+ "Without AggregateProjectPullUpConstants, both columns must remain in
GROUP BY. Plan:\n" + explain);
+ }
+
+ @Test
+ public void testLimitMergeEnabledByDefault() {
+ // Default-on. An outer LIMIT 5 over an inner LIMIT 10 collapses to the
tighter outer LIMIT.
+ String query = "EXPLAIN PLAN FOR SELECT col1 FROM (SELECT col1 FROM a
LIMIT 10) LIMIT 5";
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ assertFalse(explain.contains("fetch=[10]"),
+ "LimitMerge should drop the wider inner LIMIT=10. Plan:\n" + explain);
+ assertTrue(explain.contains("fetch=[5]"),
+ "LimitMerge should keep the tighter outer LIMIT=5. Plan:\n" + explain);
+ }
+
+ @Test
+ public void testDisableLimitMerge() {
+ // Disabling the rule must keep both LIMIT nodes (the inner fetch=10
survives).
+ String query = "EXPLAIN PLAN FOR SELECT col1 FROM (SELECT col1 FROM a
LIMIT 10) LIMIT 5";
+ String explain = explainQueryWithRuleDisabled(query,
PlannerRuleNames.LIMIT_MERGE);
+ assertTrue(explain.contains("fetch=[10]"),
+ "Without LimitMerge, the inner fetch=[10] must remain. Plan:\n" +
explain);
+ }
+
+ @Test
+ public void testUnionMergeEnabledByDefault() {
+ // Default-on. A 3-way UNION ALL must be a single n-ary LogicalUnion, not
Union(Union(a,b), c).
+ String query = "EXPLAIN PLAN FOR "
+ + "SELECT col1, col2 FROM a UNION ALL "
+ + "SELECT col1, col2 FROM b UNION ALL "
+ + "SELECT col1, col2 FROM c";
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ int unionCount = explain.split("LogicalUnion\\(all=\\[true]\\)",
-1).length - 1;
+ assertEquals(unionCount, 1,
+ "UnionMerge should collapse nested LogicalUnion to a single n-ary
Union. Plan:\n" + explain);
+ }
+
+ @Test
+ public void testDisableUnionMerge() {
+ // Disabling the rule preserves the nested Union(Union(a,b),c) shape.
+ String query = "EXPLAIN PLAN FOR "
+ + "SELECT col1, col2 FROM a UNION ALL "
+ + "SELECT col1, col2 FROM b UNION ALL "
+ + "SELECT col1, col2 FROM c";
+ String explain = explainQueryWithRuleDisabled(query,
PlannerRuleNames.UNION_MERGE);
+ int unionCount = explain.split("LogicalUnion\\(all=\\[true]\\)",
-1).length - 1;
+ assertEquals(unionCount, 2,
+ "Without UnionMerge, two LogicalUnion nodes must remain in a nested
3-way UNION ALL. Plan:\n" + explain);
+ }
+
+ @Test
+ public void testSortRemoveConstantKeysEnabledByDefault() {
+ // Default-on. ORDER BY pinning a filtered-to-constant column should drop
that column from the
+ // sort key (and from the resulting exchange's hash key).
+ String query = "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col1 =
'US' ORDER BY col1, col2";
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ assertFalse(explain.contains("sort0=[$0], sort1=[$1]"),
+ "SortRemoveConstantKeys should drop col1 from the multi-key ORDER BY.
Plan:\n" + explain);
+ }
+
+ @Test
+ public void testDisableSortRemoveConstantKeys() {
+ // Disabling the rule preserves the multi-key (col1, col2) ORDER BY.
+ String query = "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col1 =
'US' ORDER BY col1, col2";
+ String explain = explainQueryWithRuleDisabled(query,
PlannerRuleNames.SORT_REMOVE_CONSTANT_KEYS);
+ assertTrue(explain.contains("sort0=[$0], sort1=[$1]"),
+ "Without SortRemoveConstantKeys, the multi-key ORDER BY must remain.
Plan:\n" + explain);
+ }
+
+ // NOTE: ProjectAggregateMergeRule has no dedicated unit test. On the query
shapes we tested
+ // (e.g. SELECT col1, total FROM (SELECT col1, SUM(col2) AS total, COUNT(*)
AS unused FROM a
+ // GROUP BY col1)), other Pinot rules already prune the unused aggregate
call before
+ // ProjectAggregateMergeRule gets a chance to fire. The rule is registered
defensively in case
+ // a future query shape evades the existing pruning, but its standalone
behavior is not
+ // observable in current test queries.
+
+ @Test
+ public void testSortProjectTransposeDisabledByDefault() {
+ // Default-OFF (opt-in). Plan must keep Sort above Project — without the
rule the outer
+ // projection wraps the sort, not the other way around.
+ String query = "EXPLAIN PLAN FOR SELECT col1 FROM a ORDER BY col1";
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ int sortIdx = explain.indexOf("LogicalSort");
+ int projectIdx = explain.indexOf("LogicalProject");
+ assertTrue(sortIdx >= 0 && projectIdx > sortIdx,
+ "Default plan must place Sort above Project. Plan:\n" + explain);
+ }
+
+ @Test
+ public void testEnableSortProjectTranspose() {
+ // Opt-in. With the rule enabled the Project bubbles above the Sort so
LIMIT can apply before
+ // projection expressions are evaluated.
+ String query = "EXPLAIN PLAN FOR SELECT col1 FROM a ORDER BY col1";
+ String explain = explainQueryWithRuleEnabled(query,
PlannerRuleNames.SORT_PROJECT_TRANSPOSE);
+ int sortIdx = explain.indexOf("LogicalSort");
+ int projectIdx = explain.indexOf("LogicalProject");
+ assertTrue(projectIdx >= 0 && sortIdx > projectIdx,
+ "With SortProjectTranspose enabled, Project must be above Sort.
Plan:\n" + explain);
+ }
+
@Test
public void testAggregateUnionAggregateDisabledByDefault() {
// Verify that the AggregateUnionAggregateRule is disabled by default
diff --git a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
index 064a686d6c6..099782e5768 100644
--- a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
@@ -58,10 +58,10 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col2, a.col1, SUM(a.col3) FROM a
WHERE a.col3 >= 0 AND a.col1 = 'a' GROUP BY a.col1, a.col2",
"output": [
"Execution Plan",
- "\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
- "\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)],
aggType=[FINAL])",
- "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)],
aggType=[LEAF])",
+ "\nLogicalProject(col2=[$0], col1=[_UTF-8'a':VARCHAR CHARACTER SET
\"UTF-8\"], EXPR$2=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{1}], agg#0=[$SUM0($2)],
aggType=[LEAF])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0,
_UTF-8'a'))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
@@ -180,10 +180,10 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, a.col1, SUM(a.col3)
FROM a WHERE a.col3 >= 0 AND a.col1 = 'a' GROUP BY a.col1, a.col2",
"output": [
"Execution Plan",
- "\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
- "\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)],
aggType=[DIRECT])",
- "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\nLogicalProject(col2=[$0], col1=[_UTF-8'a':VARCHAR CHARACTER SET
\"UTF-8\"], EXPR$2=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[DIRECT])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0,
_UTF-8'a'))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index 7d637c1d5e8..f4bfeff1c2a 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -13,7 +13,8 @@
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
- },{
+ },
+ {
"description": "Verify that override for lite mode leaf stage fan-out
adjusted limit works.",
"sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET
liteModeLeafStageFanOutAdjustedLimit=1000; EXPLAIN PLAN FOR SELECT COUNT(*)
FROM a WHERE col1 = 'foo'",
"output": [
@@ -830,11 +831,12 @@
"\n PhysicalFilter(condition=[=($3, 1)])",
"\n PhysicalWindow(window#0=[window(partition {1} order by [2]
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PhysicalSort(sort0=[$2], dir0=[ASC])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
- "\n
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF],
limit=[100000])",
- "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
- "\n PhysicalTableScan(table=[[default, a]])",
+ "\n PhysicalProject(col1=[_UTF-8'foo':VARCHAR CHARACTER SET
\"UTF-8\"], col2=[$0], col3=[$1])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+ "\n
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{1, 2}], aggType=[LEAF],
limit=[100000])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
@@ -844,11 +846,12 @@
"output": [
"Execution Plan",
"\nPhysicalSort(offset=[100], fetch=[400])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)],
aggType=[FINAL])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()],
aggType=[LEAF], limit=[100000])",
- "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
- "\n PhysicalTableScan(table=[[default, a]])",
+ "\n PhysicalProject(col1=[_UTF-8'foo':VARCHAR CHARACTER SET
\"UTF-8\"], col2=[$0], col3=[$1], EXPR$3=[$2])",
+ "\n PhysicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{1, 2}], agg#0=[COUNT()],
aggType=[LEAF], limit=[100000])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
@@ -937,11 +940,12 @@
"Execution Plan",
"\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(offset=[100], fetch=[400])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)],
aggType=[FINAL])",
- "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()],
aggType=[LEAF], limit=[100000])",
- "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
- "\n PhysicalTableScan(table=[[default, a]])",
+ "\n PhysicalProject(col1=[_UTF-8'foo':VARCHAR CHARACTER SET
\"UTF-8\"], col2=[$0], col3=[$1], EXPR$3=[$2])",
+ "\n PhysicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{1, 2}], agg#0=[COUNT()],
aggType=[LEAF], limit=[100000])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
diff --git
a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index 9d8afd51d0a..2a95cb1af7d 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -137,10 +137,10 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, a.col1, SUM(a.col3)
FROM a WHERE a.col3 >= 0 AND a.col1 = 'a' GROUP BY a.col1, a.col2",
"output": [
"Execution Plan",
- "\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
- "\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)],
aggType=[DIRECT])",
- "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\nLogicalProject(col2=[$0], col1=[_UTF-8'a':VARCHAR CHARACTER SET
\"UTF-8\"], EXPR$2=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[DIRECT])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0,
_UTF-8'a'))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index 59eff321c57..01d05a5ea90 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -23,13 +23,11 @@
"Execution Plan",
"\nLogicalUnion(all=[true])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n LogicalUnion(all=[true])",
- "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n PinotLogicalTableScan(table=[[default, a]])",
- "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n PinotLogicalTableScan(table=[[default, b]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n PinotLogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n PinotLogicalTableScan(table=[[default, c]])",
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 840dd97db51..f4d735a6d77 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1041,6 +1041,14 @@ public class CommonConstants {
public static final String PRUNE_EMPTY_JOIN_LEFT = "PruneEmptyJoinLeft";
public static final String PRUNE_EMPTY_JOIN_RIGHT =
"PruneEmptyJoinRight";
public static final String JOIN_TO_ENRICHED_JOIN = "JoinToEnrichedJoin";
+ public static final String AGGREGATE_PROJECT_PULL_UP_CONSTANTS =
"AggregateProjectPullUpConstants";
+ public static final String LIMIT_MERGE = "LimitMerge";
+ public static final String SORT_REMOVE_CONSTANT_KEYS =
"SortRemoveConstantKeys";
+ // Default-off — firing in BASIC_RULES disrupts ProjectToSemiJoinRule
pattern matching on
+ // partition-hinted IN (SELECT) queries. See PinotQueryRuleSets for full
rationale.
+ public static final String SORT_PROJECT_TRANSPOSE =
"SortProjectTranspose";
+ public static final String UNION_MERGE = "UnionMerge";
+ public static final String PROJECT_AGGREGATE_MERGE =
"ProjectAggregateMerge";
}
/**
@@ -1058,7 +1066,11 @@ public class CommonConstants {
PlannerRuleNames.AGGREGATE_UNION_AGGREGATE,
PlannerRuleNames.JOIN_TO_ENRICHED_JOIN,
PlannerRuleNames.AGGREGATE_FUNCTION_REWRITE,
- PlannerRuleNames.JOIN_PUSH_TRANSITIVE_PREDICATES
+ PlannerRuleNames.JOIN_PUSH_TRANSITIVE_PREDICATES,
+ // Stock Calcite rule kept opt-in via usePlannerRules — see
SORT_PROJECT_TRANSPOSE javadoc
+ // above for the rationale (firing in BASIC_RULES disrupts
ProjectToSemiJoinRule on
+ // partition-hinted IN(SELECT) queries, breaking colocated broadcast
semi-joins).
+ PlannerRuleNames.SORT_PROJECT_TRANSPOSE
);
public static final String CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES =
"pinot.broker.mse.planner.disabled.rules";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]