This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a7622546633 Push limit to leaf stage by default for DISTINCT / 
no-aggregate GROUP BY (#18598)
a7622546633 is described below

commit a7622546633ca8868bb8125ebac1145ade1beae6
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 18:25:36 2026 -0700

    Push limit to leaf stage by default for DISTINCT / no-aggregate GROUP BY 
(#18598)
---
 .../tests/custom/GroupByOptionsTest.java           | 62 ++++++++++++++
 .../integration/tests/custom/JsonPathTest.java     |  6 +-
 .../PinotAggregateExchangeNodeInsertRule.java      | 22 ++++-
 .../src/test/resources/queries/GroupByPlans.json   | 94 ++++++++++++++++++++++
 .../resources/queries/PhysicalOptimizerPlans.json  | 17 ++++
 5 files changed, 193 insertions(+), 8 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
index 4a97cc014dd..4194e53ca9c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
@@ -389,6 +389,68 @@ public class GroupByOptionsTest extends 
CustomDataQueryClusterIntegrationTest {
     }
   }
 
+  @Test
+  public void testDistinctWithLimitAndOffsetReturnsFullCardinality()
+      throws Exception {
+    // Default-on leaf-limit pushdown for no-aggregate DISTINCT must still 
honor OFFSET. The planner pushes
+    // offset + fetch down (the sort-exchange-copy folds offset into the inner 
sort's fetch), so a paginated DISTINCT
+    // returns the full requested page, not fetch - offset rows. 'j' has 10 
distinct values (0..9), well above n + m.
+    setUseMultiStageQueryEngine(true);
+    String table = getTableName();
+
+    // Ordered: the returned rows are the global ranks (m+1)..(m+n), i.e. the 
3rd, 4th, 5th smallest distinct
+    // values => 2, 3, 4.
+    Assert.assertEquals(
+        toResultStr(postV2Query("select distinct j from " + table + " order by 
j limit 3 offset 2")),
+        "\"j\"[\"LONG\"]\n2\n3\n4");
+    // Control with offset 0 => 0, 1, 2.
+    Assert.assertEquals(
+        toResultStr(postV2Query("select distinct j from " + table + " order by 
j limit 3")),
+        "\"j\"[\"LONG\"]\n0\n1\n2");
+
+    // Unordered: the result set is arbitrary, but the cardinality must be 
exactly the requested page size (3), and
+    // every value must be a valid distinct 'j'. Without accounting for the 
offset this would undercount.
+    JsonNode rows = postV2Query("select distinct j from " + table + " limit 3 
offset 2").get(RESULT_TABLE).get("rows");
+    Assert.assertEquals(rows.size(), 3, "DISTINCT with LIMIT 3 OFFSET 2 must 
return a full page of 3 rows");
+    for (JsonNode row : rows) {
+      long value = row.get(0).asLong();
+      Assert.assertTrue(value >= 0 && value <= 9, "unexpected distinct value: 
" + value);
+    }
+  }
+
+  @Test
+  public void testGroupByNoAggregateWithLimitOffsetAndTrimEquivalence()
+      throws Exception {
+    // Covers the no-aggregate GROUP BY (non-DISTINCT) path with the 
default-on leaf-limit pushdown, plus
+    // result-equivalence between default-on group trim and the explicit 
opt-out when trim is a no-op.
+    setUseMultiStageQueryEngine(true);
+    String table = getTableName();
+
+    // No-aggregate GROUP BY col with LIMIT/OFFSET must return the full 
requested page (same trim machinery as
+    // DISTINCT). 'j' has 10 distinct values; ordered page (m+1)..(m+n) => 2, 
3, 4.
+    Assert.assertEquals(
+        toResultStr(postV2Query("select j from " + table + " group by j order 
by j limit 3 offset 2")),
+        "\"j\"[\"LONG\"]\n2\n3\n4");
+
+    // Unordered no-aggregate GROUP BY: cardinality must be exactly the page 
size (3), every value a valid 'j'.
+    JsonNode rows = postV2Query("select j from " + table + " group by j limit 
3 offset 2")
+        .get(RESULT_TABLE).get("rows");
+    Assert.assertEquals(rows.size(), 3, "GROUP BY without aggregate with LIMIT 
3 OFFSET 2 must return a full page");
+    for (JsonNode row : rows) {
+      long value = row.get(0).asLong();
+      Assert.assertTrue(value >= 0 && value <= 9, "unexpected group key: " + 
value);
+    }
+
+    // When the total number of distinct values ('i' has 4) is below the 
limit, leaf/final trim is a no-op, so the
+    // default-on behavior must return exactly the same rows as the explicit 
opt-out. Order by the key for a
+    // deterministic comparison.
+    String defaultOn = toResultStr(postV2Query("select distinct i from " + 
table + " order by i limit 100"));
+    String optedOut = toResultStr(postV2Query(
+        "select /*+ aggOptions(is_enable_group_trim='false') */ distinct i 
from " + table + " order by i limit 100"));
+    Assert.assertEquals(defaultOn, optedOut, "default-on trim must match the 
opt-out when total distinct < limit");
+    Assert.assertEquals(defaultOn, "\"i\"[\"INT\"]\n0\n1\n2\n3");
+  }
+
   @Test
   public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index bb90cf7313d..94af87e0bb7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -929,11 +929,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
 
     assertEquals(extractOrderedDistinctValues(baselineResponse).size(), 5);
     assertEquals(extractOrderedDistinctValues(optimizedResponse).size(), 5);
-
-    // TODO: Fix LIMIT push down for MSE
-    if (!useMultiStageQueryEngine) {
-      
assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asInt(), 5 * 
getNumAvroFiles());
-    }
+    assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asInt(), 
5 * getNumAvroFiles());
   }
 
   /// Cross-path 5-arg form: filter on `$.k2`, extract `$.k1`. 
`getMatchingFlattenedDocsMap` applies the filter
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 39e3988fe64..b6f7ea56257 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -132,7 +132,7 @@ public class PinotAggregateExchangeNodeInsertRule {
       Map<String, String> hintOptions =
           PinotHintStrategyTable.getHintOptions(aggRel.getHints(), 
PinotHintOptions.AGGREGATE_HINT_OPTIONS);
 
-      if (!isGroupTrimmingEnabled(call, hintOptions)) {
+      if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
         return;
       } else if (hintOptions == null) {
         hintOptions = Collections.emptyMap();
@@ -186,7 +186,7 @@ public class PinotAggregateExchangeNodeInsertRule {
       Map<String, String> hintOptions =
           PinotHintStrategyTable.getHintOptions(aggRel.getHints(), 
PinotHintOptions.AGGREGATE_HINT_OPTIONS);
 
-      if (!isGroupTrimmingEnabled(call, hintOptions)) {
+      if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
         return;
       } else if (hintOptions == null) {
         hintOptions = Collections.emptyMap();
@@ -479,14 +479,30 @@ public class PinotAggregateExchangeNodeInsertRule {
     return null;
   }
 
-  private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, 
Map<String, String> hintOptions) {
+  private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, 
Map<String, String> hintOptions,
+      Aggregate aggRel) {
     if (hintOptions != null) {
       String option = 
hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM);
       if (option != null) {
+        // Explicit hint always wins (true or false), for aggregates with AND 
without aggregate functions.
         return Boolean.parseBoolean(option);
       }
     }
 
+    // Group-by WITHOUT aggregate functions (DISTINCT or `GROUP BY col` with 
no agg calls) can always push the
+    // limit/collations down to the leaf stage by default: ORDER BY can only 
reference group keys, which are fully
+    // materialized at the leaf, so leaf-level trim is exact (and a plain 
LIMIT without ORDER BY returns a valid
+    // subset). This mirrors PinotLogicalAggregateRule (the physical-optimizer 
path).
+    // TODO: Consider also enabling this by default for aggregation queries 
whose ORDER BY references only group keys
+    //       (not aggregate results). The same argument holds there - a 
group's rank by its key is identical at every
+    //       leaf, so keeping the per-leaf top-K never drops a group that 
belongs in the global top-N, and the kept
+    //       groups still get their aggregates fully merged at the final 
stage. It is NOT safe when ORDER BY is over an
+    //       aggregate (partial values rank differently per leaf) or for an 
unordered limit with aggregates (an
+    //       arbitrarily dropped group would be under-counted).
+    if (aggRel.getAggCallList().isEmpty()) {
+      return true;
+    }
+
     Context genericContext = call.getPlanner().getContext();
     if (genericContext != null) {
       QueryEnvironment.Config context = 
genericContext.unwrap(QueryEnvironment.Config.class);
diff --git a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json 
b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
index 099782e5768..5977fb6ff01 100644
--- a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
@@ -298,6 +298,100 @@
           "\n              PinotLogicalTableScan(table=[[default, a]])",
           "\n"
         ]
+      },
+      {
+        "description": "Distinct with limit pushes limit to leaf and final 
aggregate by default (no hint)",
+        "sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3 
>= 0 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[10])",
+          "\n      PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL], 
collations=[[]], limit=[10])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF], 
collations=[[]], limit=[10])",
+          "\n            LogicalFilter(condition=[>=($2, 0)])",
+          "\n              PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by without aggregate functions with limit pushes 
limit to leaf and final by default",
+        "sql": "EXPLAIN PLAN FOR SELECT col1 FROM a GROUP BY col1 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[10])",
+          "\n      PinotLogicalAggregate(group=[{0}], aggType=[FINAL], 
collations=[[]], limit=[10])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalAggregate(group=[{0}], aggType=[LEAF], 
collations=[[]], limit=[10])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Distinct with order by on group key and limit pushes 
collations and limit to leaf and final by default",
+        "sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1 FROM a ORDER BY col1 
LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
+          "\n      PinotLogicalAggregate(group=[{0}], aggType=[FINAL], 
collations=[[0]], limit=[10])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalAggregate(group=[{0}], aggType=[LEAF], 
collations=[[0]], limit=[10])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by without aggregate functions with HAVING on 
the group key still pushes limit (filter applied at leaf before trim)",
+        "sql": "EXPLAIN PLAN FOR SELECT col3 FROM a GROUP BY col3 HAVING col3 
> 5 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[10])",
+          "\n      PinotLogicalAggregate(group=[{0}], aggType=[FINAL], 
collations=[[]], limit=[10])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalAggregate(group=[{2}], aggType=[LEAF], 
collations=[[]], limit=[10])",
+          "\n            LogicalFilter(condition=[>($2, 5)])",
+          "\n              PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Distinct with limit and offset pushes offset+fetch to 
leaf and final aggregate by default",
+        "sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3 
>= 0 LIMIT 11 OFFSET 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[10], fetch=[11])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[21])",
+          "\n      PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL], 
collations=[[]], limit=[21])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF], 
collations=[[]], limit=[21])",
+          "\n            LogicalFilter(condition=[>=($2, 0)])",
+          "\n              PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Distinct with limit does not push limit to aggregate 
when group trim is explicitly disabled via hint",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='false') */ DISTINCT col1, col2 FROM a WHERE 
col3 >= 0 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[10])",
+          "\n      PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF])",
+          "\n            LogicalFilter(condition=[>=($2, 0)])",
+          "\n              PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       }
     ]
   }
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index e2d5a7de71c..d9c69907af1 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -653,6 +653,23 @@
           "\n"
         ]
       },
+      {
+        "description": "Distinct with limit pushes limit to leaf and final 
aggregate by default (no hint)",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 
DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(fetch=[10])",
+          "\n        PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], 
limit=[10])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+          "\n            PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], 
limit=[10])",
+          "\n              PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
       {
         "description": "SQL hint based group by optimization with group trim 
enabled and offset pushes down offset + fetch",
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a 
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10 OFFSET 5",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to