This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c2561d4c63e Include OFFSET in physical optimizer group-trim limit 
pushdown (#18600)
c2561d4c63e is described below

commit c2561d4c63e7b94a8f2172ed71bad1a4d24624fe
Author: Yash Mayya <[email protected]>
AuthorDate: Wed May 27 17:55:05 2026 -0700

    Include OFFSET in physical optimizer group-trim limit pushdown (#18600)
---
 .../tests/custom/GroupByOptionsTest.java           | 48 ++++++++++++++++++++++
 .../rel/rules/PinotLogicalAggregateRule.java       | 32 +++++++++++----
 .../resources/queries/PhysicalOptimizerPlans.json  | 35 ++++++++++++++++
 3 files changed, 107 insertions(+), 8 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
index 8d3c8d2672e..4a97cc014dd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -341,6 +343,52 @@ public class GroupByOptionsTest extends 
CustomDataQueryClusterIntegrationTest {
     );
   }
 
+  @Test
+  public void testGroupTrimWithOffsetReturnsFullPageOnPhysicalOptimizer()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    // On the physical-optimizer path, group trim must push down 'offset + 
fetch' to the leaf/final aggregate.
+    // Without it, the aggregate keeps only 'fetch' groups (there are 40 
distinct (i, j) groups), so the outer
+    // OFFSET drops everything and the page comes back short. We query without 
ORDER BY so the runtime trims to
+    // exactly the pushed-down limit (see AggregateOperator), making the 
cardinality bug deterministic.
+    String physicalOpt = "SET usePhysicalOptimizer=true; ";
+
+    // No-aggregate DISTINCT path: group trim is on by default here.
+    JsonNode distinct = postV2Query(physicalOpt
+        + " select distinct i, j from " + getTableName() + " limit 5 offset 
10");
+    assertFullPageOfGroups(distinct, -1);
+
+    // Hinted aggregate path with group trim enabled. Every (i, j) group has 
exactly 2 rows in the test data.
+    JsonNode aggregated = postV2Query(physicalOpt
+        + " select /*+ aggOptions(is_enable_group_trim='true') */ i, j, 
count(*) as cnt from " + getTableName()
+        + " group by i, j limit 5 offset 10");
+    assertFullPageOfGroups(aggregated, 2);
+  }
+
+  /**
+   * Asserts the result is a full page of 5 distinct, in-domain (i, j) groups. 
The rows are not ordered (we query
+   * without ORDER BY for deterministic trimming), so we validate the page 
size and group validity rather than exact
+   * values. When {@code expectedCount >= 0}, also asserts each group's 
COUNT(*).
+   */
+  private static void assertFullPageOfGroups(JsonNode mainNode, int 
expectedCount) {
+    JsonNode resultTable = mainNode.get(RESULT_TABLE);
+    Assert.assertNotNull(resultTable, toResultStr(mainNode));
+    JsonNode rows = resultTable.get("rows");
+    Assert.assertEquals(rows.size(), 5, toResultStr(mainNode));
+    Set<String> seenGroups = new HashSet<>();
+    for (JsonNode row : rows) {
+      int i = row.get(0).intValue();
+      int j = row.get(1).intValue();
+      Assert.assertTrue(i >= 0 && i < FILES_NO, "i out of range: " + i);
+      Assert.assertTrue(j >= 0 && j < 10, "j out of range: " + j);
+      Assert.assertTrue(seenGroups.add(i + "," + j), "duplicate group: (" + i 
+ ", " + j + ")");
+      if (expectedCount >= 0) {
+        Assert.assertEquals(row.get(2).intValue(), expectedCount, 
toResultStr(mainNode));
+      }
+    }
+  }
+
   @Test
   public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
       throws Exception {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
index 5ae5e3a1c23..83299279a09 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
@@ -88,10 +88,7 @@ public class PinotLogicalAggregateRule {
           return;
         }
       }
-      int limit = 0;
-      if (sortRel.fetch != null) {
-        limit = RexLiteral.intValue(sortRel.fetch);
-      }
+      int limit = getGroupTrimLimit(sortRel);
       if (limit <= 0) {
         // Cannot enable group trim when there is no limit.
         return;
@@ -125,10 +122,7 @@ public class PinotLogicalAggregateRule {
 
       Sort sortRel = call.rel(0);
       List<RelFieldCollation> collations = 
sortRel.getCollation().getFieldCollations();
-      int limit = 0;
-      if (sortRel.fetch != null) {
-        limit = RexLiteral.intValue(sortRel.fetch);
-      }
+      int limit = getGroupTrimLimit(sortRel);
       if (limit <= 0) {
         // Cannot enable group trim when there is no limit.
         return;
@@ -162,6 +156,28 @@ public class PinotLogicalAggregateRule {
     return createPlan(aggRel, null, 0);
   }
 
+  /**
+   * Returns the limit to push down into the aggregate for group trim, or 0 if 
group trim should not be applied.
+   * The pushed-down limit is {@code offset + fetch} so that the 
leaf/intermediate aggregate retains enough groups to
+   * cover the outer {@code OFFSET ... FETCH} window. Adding only {@code 
fetch} would trim away rows that the offset
+   * window still needs, under-counting paginated queries.
+   */
+  private static int getGroupTrimLimit(Sort sortRel) {
+    if (sortRel.fetch == null) {
+      return 0;
+    }
+    int limit = RexLiteral.intValue(sortRel.fetch);
+    if (limit <= 0) {
+      return 0;
+    }
+    if (sortRel.offset != null) {
+      // Clamp to avoid int overflow. Integer.MAX_VALUE is safe downstream: 
GroupByUtils.getTableCapacity uses long
+      // arithmetic.
+      limit = (int) Math.min(Integer.MAX_VALUE, (long) limit + 
RexLiteral.intValue(sortRel.offset));
+    }
+    return limit;
+  }
+
   private static PinotLogicalAggregate createPlan(Aggregate aggRel, @Nullable 
List<RelFieldCollation> collations,
       int limit) {
     Map<String, String> hintOptions =
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index e7656c04108..7d637c1d5e8 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -634,6 +634,41 @@
           "\n                PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
+      },
+      {
+        "description": "SQL hint based distinct optimization with group trim 
enabled and offset pushes down offset + fetch",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE 
col3 >= 0 LIMIT 10 OFFSET 5",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(offset=[5], fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(fetch=[15])",
+          "\n        PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], 
limit=[15])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+          "\n            PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], 
limit=[15])",
+          "\n              PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SQL hint based group by optimization with group trim 
enabled and offset pushes down offset + fetch",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a 
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10 OFFSET 5",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[DESC], offset=[5], fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[15])",
+          "\n        PhysicalProject(cnt=[$1])",
+          "\n          PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], limit=[15])",
+          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n              PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[15])",
+          "\n                PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                  PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       }
     ]
   },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to