This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d304909d8a [multistage] add maxRowsInJoin, maxRowsInWindow, numGroups 
to query response (#17784)
6d304909d8a is described below

commit 6d304909d8a7794006792d8f918d389bfd07abc7
Author: dang-stripe <[email protected]>
AuthorDate: Sat Mar 14 01:09:53 2026 -0700

    [multistage] add maxRowsInJoin, maxRowsInWindow, numGroups to query 
response (#17784)
---
 .../pinot/common/response/BrokerResponse.java      |   2 +
 .../response/broker/BrokerResponseNative.java      |   2 +
 .../response/broker/BrokerResponseNativeV2.java    |  37 +++++++-
 .../query/runtime/operator/AggregateOperator.java  |   9 ++
 .../query/runtime/operator/BaseJoinOperator.java   |  15 +++
 .../query/runtime/operator/MultiStageOperator.java |   3 +
 .../runtime/operator/WindowAggregateOperator.java  |  12 +++
 .../runtime/operator/AggregateOperatorTest.java    |  39 ++++++++
 .../runtime/operator/HashJoinOperatorTest.java     | 104 +++++++++++++++++++++
 .../operator/WindowAggregateOperatorTest.java      |  68 ++++++++++++++
 .../runtime/plan/MultiStageQueryStatsTest.java     |  48 +++++++++-
 11 files changed, 336 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index fc800024719..603b0071d59 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -160,11 +160,13 @@ public interface BrokerResponse {
    */
   boolean isMaxRowsInJoinReached();
 
+
   /**
    * Returns whether the limit for max rows in window has been reached.
    */
   boolean isMaxRowsInWindowReached();
 
+
   /**
    * Returns the total time used for query execution in milliseconds.
    */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 0e2eaeb77c6..8031395c911 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -238,12 +238,14 @@ public class BrokerResponseNative implements 
BrokerResponse {
     return false;
   }
 
+
   @JsonIgnore
   @Override
   public boolean isMaxRowsInWindowReached() {
     return false;
   }
 
+
   @Override
   public long getTimeUsedMs() {
     return _timeUsedMs;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 84cb82183b6..e485912feea 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -38,7 +38,8 @@ import org.apache.pinot.common.response.ProcessingException;
  */
 @JsonPropertyOrder({
     "resultTable", "numRowsResultSet", "partialResult", "exceptions", 
"numGroupsLimitReached",
-    "numGroupsWarningLimitReached", "maxRowsInJoinReached", 
"maxRowsInWindowReached", "timeUsedMs", "stageStats",
+    "numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached", 
"maxRowsInJoin",
+    "maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats",
     "maxRowsInOperator", "requestId", "clientRequestId", "brokerId", 
"numDocsScanned", "totalDocs",
     "numEntriesScannedInFilter", "numEntriesScannedPostFilter", 
"numServersQueried", "numServersResponded",
     "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried",
@@ -59,7 +60,9 @@ public class BrokerResponseNativeV2 implements BrokerResponse 
{
   private ResultTable _resultTable;
   private int _numRowsResultSet;
   private boolean _maxRowsInJoinReached;
+  private long _maxRowsInJoin;
   private boolean _maxRowsInWindowReached;
+  private long _maxRowsInWindow;
   private long _timeUsedMs;
   /**
    * Statistics for each stage of the query execution.
@@ -143,6 +146,14 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     _brokerStats.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, 
numGroupsLimitReached);
   }
 
+  public long getNumGroups() {
+    return _brokerStats.getLong(StatKey.NUM_GROUPS);
+  }
+
+  public void mergeNumGroups(long numGroups) {
+    _brokerStats.merge(StatKey.NUM_GROUPS, numGroups);
+  }
+
   @Override
   public boolean isNumGroupsWarningLimitReached() {
     return _brokerStats.getBoolean(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED);
@@ -161,6 +172,14 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     _maxRowsInJoinReached |= maxRowsInJoinReached;
   }
 
+  public long getMaxRowsInJoin() {
+    return _maxRowsInJoin;
+  }
+
+  public void mergeMaxRowsInJoin(long maxRowsInJoin) {
+    _maxRowsInJoin = Math.max(_maxRowsInJoin, maxRowsInJoin);
+  }
+
   @Override
   public boolean isMaxRowsInWindowReached() {
     return _maxRowsInWindowReached;
@@ -170,6 +189,14 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     _maxRowsInWindowReached |= maxRowsInWindowReached;
   }
 
+  public long getMaxRowsInWindow() {
+    return _maxRowsInWindow;
+  }
+
+  public void mergeMaxRowsInWindow(long maxRowsInWindow) {
+    _maxRowsInWindow = Math.max(_maxRowsInWindow, maxRowsInWindow);
+  }
+
   /**
    * Returns the stage statistics.
    */
@@ -453,7 +480,13 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
     GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
-    NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
+    NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+    NUM_GROUPS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return Math.max(value1, value2);
+      }
+    };
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 0a792c41120..f7f2da22666 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -228,6 +228,9 @@ public class AggregateOperator extends MultiStageOperator {
         rows = _groupByExecutor.getResult(_groupTrimSize);
       }
 
+      // Record stat before we check for limit so we can propagate to query 
response
+      _statMap.merge(StatKey.NUM_GROUPS, _groupByExecutor.getNumGroups());
+
       if (rows.isEmpty()) {
         return _eosBlock;
       } else {
@@ -472,6 +475,12 @@ public class AggregateOperator extends MultiStageOperator {
     GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+    NUM_GROUPS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return Math.max(value1, value2);
+      }
+    },
     /**
      * Allocated memory in bytes for this operator or its children in the same 
stage.
      */
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index fe46b175469..8ec374ca4b5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -212,6 +212,8 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
       // Row based overflow check.
       if (rows.size() + numRows > _maxRowsInJoin) {
         if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+          // Record stat before we throw so it propagates to query response
+          _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numRows + rows.size());
           throwForJoinRowLimitExceeded(
               "Cannot build in memory hash table for join operator, reached 
number of rows limit: " + _maxRowsInJoin);
         } else {
@@ -236,6 +238,7 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
     } else {
       _isRightTableBuilt = true;
       finishBuildingRightTable();
+      _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numRows);
     }
 
     _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, 
System.currentTimeMillis() - startTime);
@@ -343,6 +346,7 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
   protected boolean isMaxRowsLimitReached(int numJoinedRows) {
     if (numJoinedRows == _maxRowsInJoin) {
       if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+        _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numJoinedRows);
         throwForJoinRowLimitExceeded(
             "Cannot process join, reached number of rows limit: " + 
_maxRowsInJoin);
       } else {
@@ -350,6 +354,7 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
         logger().info("Terminating join operator early as the maximum number 
of rows limit was reached: {}",
             _maxRowsInJoin);
         earlyTerminateLeftInput();
+        _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numJoinedRows);
         _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
         return true;
       }
@@ -390,6 +395,16 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
      * How long (CPU time) has been spent on building the hash table.
      */
     TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG),
+    /**
+     * The max number of rows seen in the join. Recorded during right table 
build (normal and overflow paths)
+     * and at the joined-output limit check in {@link #isMaxRowsLimitReached}.
+     */
+    MAX_ROWS_IN_JOIN(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return Math.max(value1, value2);
+      }
+    },
     /**
      * Allocated memory in bytes for this operator or its children in the same 
stage.
      */
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index c6227918cfa..b33391d1284 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -252,6 +252,7 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
         
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
         response.mergeNumGroupsWarningLimitReached(
             
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED));
+        
response.mergeNumGroups(stats.getLong(AggregateOperator.StatKey.NUM_GROUPS));
         
response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS));
       }
 
@@ -275,6 +276,7 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
         StatMap<HashJoinOperator.StatKey> stats = 
(StatMap<HashJoinOperator.StatKey>) map;
         
response.mergeMaxRowsInOperator(stats.getLong(HashJoinOperator.StatKey.EMITTED_ROWS));
         
response.mergeMaxRowsInJoinReached(stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED));
+        
response.mergeMaxRowsInJoin(stats.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN));
       }
 
       @Override
@@ -411,6 +413,7 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
         
response.mergeMaxRowsInOperator(stats.getLong(WindowAggregateOperator.StatKey.EMITTED_ROWS));
         response.mergeMaxRowsInWindowReached(
             
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
+        
response.mergeMaxRowsInWindow(stats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW));
       }
 
       @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 40e077751db..f7583d1b426 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -217,6 +217,8 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       int containerSize = container.size();
       if (_numRows + containerSize > _maxRowsInWindowCache) {
         if (_windowOverflowMode == WindowOverFlowMode.THROW) {
+          // Record stat before we throw so it propagates to query response
+          _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW, _numRows + containerSize);
           throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
               "Cannot build in memory window cache for WINDOW operator, reach 
number of rows limit: "
                   + _maxRowsInWindowCache);
@@ -224,6 +226,9 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
           // Just fill up the buffer.
           int remainingRows = _maxRowsInWindowCache - _numRows;
           container = container.subList(0, remainingRows);
+          // Update container size here since MAX_ROWS_IN_WINDOW is recorded 
after the loop exits
+          // via _numRows once EOS is received from the early-terminated input.
+          containerSize = remainingRows;
           _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW_REACHED, true);
           // setting the inputOperator to be early terminated and awaits EOS 
block next.
           _input.earlyTerminate();
@@ -239,6 +244,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       checkTerminationAndSampleUsage();
       block = _input.nextBlock();
     }
+    _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW, _numRows);
     MseBlock.Eos eosBlock = (MseBlock.Eos) block;
     _eosBlock = eosBlock;
     // Early termination if the block is an error block
@@ -301,6 +307,12 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       }
     },
     MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN),
+    MAX_ROWS_IN_WINDOW(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return Math.max(value1, value2);
+      }
+    },
     /**
      * Allocated memory in bytes for this operator or its children in the same 
stage.
      */
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index c96805a1a15..60ab3e50356 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -55,6 +55,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.openMocks;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
@@ -145,6 +146,7 @@ public class AggregateOperatorTest {
     when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new 
Object[]{2, 1.0}, new Object[]{2, 2.0}))
         .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3.0}))
         .thenReturn(SuccessMseBlock.INSTANCE);
+    
when(_input.calculateStats()).thenReturn(MultiStageQueryStats.emptyStats(0));
     DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new 
ColumnDataType[]{INT, DOUBLE});
     AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys);
 
@@ -156,6 +158,10 @@ public class AggregateOperatorTest {
     assertEquals(resultRows.get(0), new Object[]{2, 6.0},
         "Expected two columns (group by key, agg value), agg value is final 
result");
     assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done 
processing)");
+    MultiStageQueryStats stats = operator.calculateStats();
+    StatMap<AggregateOperator.StatKey> statMap = 
OperatorTestUtil.getStatMap(AggregateOperator.StatKey.class, stats);
+    assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
+        "Num groups should equal the number of distinct group keys");
   }
 
   @Test
@@ -312,6 +318,8 @@ public class AggregateOperatorTest {
         "num groups limit should be reached");
     
assertTrue(statMap.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED),
         "num groups warning limit should be reached");
+    assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
+        "Num groups should equal the limit since only one group was accepted");
   }
 
   @Test
@@ -354,6 +362,37 @@ public class AggregateOperatorTest {
             collations, limit));
   }
 
+  @Test
+  public void shouldRecordNumGroupsBelowLimit() {
+    // Given: 1 distinct group key, limit = 2 — below limit, no overflow
+    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+    List<Integer> filterArgs = List.of(-1);
+    List<Integer> groupKeys = List.of(0);
+    PlanNode.NodeHint nodeHint = new 
PlanNode.NodeHint(Map.of(PinotHintOptions.AGGREGATE_HINT_OPTIONS,
+        Map.of(PinotHintOptions.AggregateOptions.NUM_GROUPS_LIMIT, "2")));
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, DOUBLE});
+
+    _input = new BlockListMultiStageOperator.Builder(inSchema)
+        .addRow(2, 1.0)
+        .addRow(2, 2.0)
+        .buildWithEos();
+    DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new 
ColumnDataType[]{INT, DOUBLE});
+    AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys, nodeHint, Map.of());
+
+    // When:
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Then:
+    assertEquals(resultRows.size(), 1);
+    assertTrue(operator.nextBlock().isEos());
+    MultiStageQueryStats stats = operator.calculateStats();
+    StatMap<AggregateOperator.StatKey> statMap = 
OperatorTestUtil.getStatMap(AggregateOperator.StatKey.class, stats);
+    
assertFalse(statMap.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED),
+        "Num groups limit should not be reached when groups are below limit");
+    assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
+        "Num groups should equal 1");
+  }
+
   private static RexExpression.FunctionCall getSum(RexExpression arg) {
     return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.SUM.name(), List.of(arg));
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 4c101b17343..7ba9c8c64f4 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -41,6 +41,7 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.openMocks;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
@@ -107,6 +108,10 @@ public class HashJoinOperatorTest {
     assertEquals(resultRows.size(), 2);
     assertEquals(resultRows.get(0), new Object[]{2, "BB", 2, "Aa"});
     assertEquals(resultRows.get(1), new Object[]{2, "BB", 2, "BB"});
+    StatMap<HashJoinOperator.StatKey> statMap =
+        OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 3,
+        "Max rows in join should equal right table size");
   }
 
   @Test
@@ -341,6 +346,10 @@ public class HashJoinOperatorTest {
         .contains("reached number of rows limit"));
     assertTrue(((ErrorMseBlock) 
block).getErrorMessages().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED)
         .contains("Cannot build in memory hash table"));
+    StatMap<HashJoinOperator.StatKey> statMap =
+        OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 3,
+        "Max rows in join should be recorded even on THROW");
   }
 
   @Test
@@ -373,6 +382,8 @@ public class HashJoinOperatorTest {
         OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
     
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
         "Max rows in join should be reached");
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 1,
+        "Max rows in join should equal the truncated right table size");
   }
 
   @Test
@@ -399,6 +410,10 @@ public class HashJoinOperatorTest {
         .contains("reached number of rows limit"));
     assertTrue(((ErrorMseBlock) 
block).getErrorMessages().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED)
         .contains("Cannot process join"));
+    StatMap<HashJoinOperator.StatKey> statMap =
+        OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 2,
+        "Max rows in join should be recorded even on THROW during output 
phase");
   }
 
   @Test
@@ -441,6 +456,8 @@ public class HashJoinOperatorTest {
         OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
     
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
         "Max rows in join should be reached");
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 2,
+        "Max rows in join should equal the truncated right table size");
   }
 
   @Test
@@ -765,6 +782,93 @@ public class HashJoinOperatorTest {
     assertTrue(containsRow(resultRows, new Object[]{3, "Cc", 3.0}));  // 
Unmatched preserved
   }
 
+  @Test
+  public void shouldRecordMaxRowsInJoinWhenRightTableFitsExactlyAtLimit() {
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+        .addRow(1, "Aa")
+        .addRow(2, "BB")
+        .buildWithEos();
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+        .addRow(2, "Aa")
+        .addRow(3, "BB")
+        .buildWithEos();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_col2", "string_col2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.INT, ColumnDataType.STRING});
+    PlanNode.NodeHint nodeHint = new 
PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
+        Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+            PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "2")));
+    HashJoinOperator operator =
+        getOperator(resultSchema, JoinRelType.INNER, List.of(0), List.of(0), 
List.of(), nodeHint);
+    operator.nextBlock(); // data block
+    operator.nextBlock(); // eos
+
+    StatMap<HashJoinOperator.StatKey> statMap =
+        OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
+    
assertFalse(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
+        "Max rows in join should not be reached when right table fits exactly 
at limit");
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 2,
+        "Max rows in join should equal right table size");
+  }
+
+  @Test
+  public void shouldRecordZeroMaxRowsInJoinWhenRightTableIsEmpty() {
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+        .addRow(1, "Aa")
+        .buildWithEos();
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+        .buildWithEos();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_col2", "string_col2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.INT, ColumnDataType.STRING});
+    HashJoinOperator operator =
+        getOperator(resultSchema, JoinRelType.INNER, List.of(0), List.of(0), 
List.of());
+    operator.nextBlock(); // eos (no matches)
+
+    StatMap<HashJoinOperator.StatKey> statMap =
+        OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 0,
+        "Max rows in join should be 0 when right table is empty");
+  }
+
+  @Test
+  public void 
shouldRecordJoinedOutputSizeWhenRightTableFitsButJoinedOutputExceedsLimit() {
+    // Right table has 2 rows (fits under limit of 5), but each left row 
matches both right rows,
+    // producing 4 x 2 = 8 potential joined rows which exceeds the limit.
+    // MAX_ROWS_IN_JOIN should reflect the joined output size (5), not the 
right table size (2).
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+        .spied()
+        .addRow(1, "Aa")
+        .addRow(2, "Aa")
+        .addRow(3, "Aa")
+        .addRow(4, "Aa")
+        .buildWithEos();
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+        .addRow(10, "Aa")
+        .addRow(20, "Aa")
+        .buildWithEos();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_col2", "string_col2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.INT, ColumnDataType.STRING});
+    PlanNode.NodeHint nodeHint = new 
PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
+        Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+            PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "5")));
+    HashJoinOperator operator =
+        getOperator(resultSchema, JoinRelType.INNER, List.of(1), List.of(1), 
List.of(), nodeHint);
+
+    // When
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Then
+    Mockito.verify(_leftInput).earlyTerminate();
+    assertEquals(resultRows.size(), 5, "Should have exactly 5 joined rows 
(truncated from potential 8)");
+    MseBlock block2 = operator.nextBlock();
+    assertTrue(block2.isSuccess());
+    StatMap<HashJoinOperator.StatKey> statMap =
+        OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, 
operator.calculateStats());
+    
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
+        "Max rows in join should be reached");
+    assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 5,
+        "Max rows in join should reflect the joined output size, not the right 
table size");
+  }
+
   private HashJoinOperator getOperator(DataSchema leftSchema, DataSchema 
resultSchema, JoinRelType joinType,
       List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> 
nonEquiConditions,
       PlanNode.NodeHint nodeHint) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index db57f40a068..0ab3cab16ae 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -50,6 +50,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.openMocks;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -137,6 +138,10 @@ public class WindowAggregateOperatorTest {
     assertEquals(resultRows.size(), 1);
     assertEquals(resultRows.get(0), new Object[]{2, 1, 1.0});
     assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done 
processing)");
+    StatMap<WindowAggregateOperator.StatKey> windowStats =
+        OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class, 
operator.calculateStats());
+    
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
 1,
+        "Max rows in window should equal number of input rows");
   }
 
   @Test
@@ -498,6 +503,10 @@ public class WindowAggregateOperatorTest {
     assertTrue(block.isError(), "expected ERROR block from window overflow");
     assertTrue(((ErrorMseBlock) 
block).getErrorMessages().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED)
         .contains("reach number of rows limit"));
+    StatMap<WindowAggregateOperator.StatKey> windowStats =
+        OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class, 
operator.calculateStats());
+    
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
 2,
+        "Max rows in window should be recorded even on THROW");
   }
 
   @Test
@@ -533,6 +542,8 @@ public class WindowAggregateOperatorTest {
         OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class, 
operator.calculateStats());
     
assertTrue(windowStats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED),
         "Max rows in window should be reached");
+    
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
 1,
+        "Max rows in window value should match the number of cached rows");
   }
 
   @Test
@@ -2987,6 +2998,63 @@ public class WindowAggregateOperatorTest {
     assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / 
FOLLOWING is not supported");
   }
 
+  @Test
+  public void testShouldRecordMaxRowsInWindowWhenInputFitsExactlyAtLimit() {
+    // Given: 1 input row, limit = 1 — fits exactly, no overflow
+    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
+    MultiStageOperator input = new 
BlockListMultiStageOperator.Builder(inputSchema)
+        .addBlock(new Object[]{2, 1})
+        .buildWithEos();
+    DataSchema resultSchema =
+        new DataSchema(new String[]{"group", "arg", "sum"}, new 
ColumnDataType[]{INT, INT, DOUBLE});
+    List<Integer> keys = List.of(0);
+    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+    PlanNode.NodeHint nodeHint = new 
PlanNode.NodeHint(Map.of(PinotHintOptions.WINDOW_HINT_OPTIONS,
+        Map.of(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE, 
"BREAK",
+            PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW, "1")));
+    WindowAggregateOperator operator =
+        getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, 
WindowNode.WindowFrameType.RANGE,
+            Integer.MIN_VALUE, Integer.MAX_VALUE, nodeHint, input);
+
+    // When:
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Then:
+    assertEquals(resultRows.size(), 1);
+    assertTrue(operator.nextBlock().isSuccess());
+    StatMap<WindowAggregateOperator.StatKey> windowStats =
+        OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class, 
operator.calculateStats());
+    
assertFalse(windowStats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED),
+        "Max rows in window should not be reached when input fits exactly at 
limit");
+    
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
 1,
+        "Max rows in window should equal number of input rows");
+  }
+
+  @Test
+  public void testShouldRecordZeroMaxRowsInWindowWhenInputIsEmpty() {
+    // Given: 0 input rows (just EOS)
+    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
+    MultiStageOperator input = new 
BlockListMultiStageOperator.Builder(inputSchema)
+        .buildWithEos();
+    DataSchema resultSchema =
+        new DataSchema(new String[]{"group", "arg", "sum"}, new 
ColumnDataType[]{INT, INT, DOUBLE});
+    List<Integer> keys = List.of(0);
+    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+    WindowAggregateOperator operator =
+        getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, 
WindowNode.WindowFrameType.RANGE,
+            Integer.MIN_VALUE, Integer.MAX_VALUE, input);
+
+    // When:
+    MseBlock block = operator.nextBlock();
+
+    // Then:
+    assertTrue(block.isEos());
+    StatMap<WindowAggregateOperator.StatKey> windowStats =
+        OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class, 
operator.calculateStats());
+    
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
 0,
+        "Max rows in window should be 0 when input is empty");
+  }
+
   private WindowAggregateOperator getOperator(DataSchema inputSchema, 
DataSchema resultSchema, List<Integer> keys,
       List<RelFieldCollation> collations, List<RexExpression.FunctionCall> 
aggCalls,
       WindowNode.WindowFrameType windowFrameType, int lowerBound, int 
upperBound, PlanNode.NodeHint nodeHint,
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
index fee7dfdfd7a..c54e2496269 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
@@ -21,11 +21,14 @@ package org.apache.pinot.query.runtime.plan;
 import java.io.IOException;
 import java.util.List;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.LeafOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -57,7 +60,8 @@ public class MultiStageQueryStatsTest {
   @DataProvider(name = "stats")
   public static MultiStageQueryStats[] stats() {
     return new MultiStageQueryStats[] {
-        stats1()
+        stats1(),
+        stats2()
     };
   }
 
@@ -97,4 +101,46 @@ public class MultiStageQueryStatsTest {
                 .close())
         .build();
   }
+
+  public static MultiStageQueryStats stats2() {
+    return new MultiStageQueryStats.Builder(1)
+        .customizeOpen(open ->
+            open.addLastOperator(MultiStageOperator.Type.MAILBOX_RECEIVE,
+                    new StatMap<>(BaseMailboxReceiveOperator.StatKey.class)
+                        
.merge(BaseMailboxReceiveOperator.StatKey.EXECUTION_TIME_MS, 50)
+                        
.merge(BaseMailboxReceiveOperator.StatKey.EMITTED_ROWS, 20))
+                .addLastOperator(MultiStageOperator.Type.HASH_JOIN,
+                    new StatMap<>(HashJoinOperator.StatKey.class)
+                        .merge(HashJoinOperator.StatKey.EXECUTION_TIME_MS, 30)
+                        .merge(HashJoinOperator.StatKey.EMITTED_ROWS, 15)
+                        .merge(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN, 
100L))
+                .addLastOperator(MultiStageOperator.Type.WINDOW,
+                    new StatMap<>(WindowAggregateOperator.StatKey.class)
+                        
.merge(WindowAggregateOperator.StatKey.EXECUTION_TIME_MS, 20)
+                        .merge(WindowAggregateOperator.StatKey.EMITTED_ROWS, 
15)
+                        
.merge(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW, 50L))
+                .addLastOperator(MultiStageOperator.Type.MAILBOX_SEND,
+                    new StatMap<>(MailboxSendOperator.StatKey.class)
+                        .merge(MailboxSendOperator.StatKey.STAGE, 1)
+                        .merge(MailboxSendOperator.StatKey.EXECUTION_TIME_MS, 
40)
+                        .merge(MailboxSendOperator.StatKey.EMITTED_ROWS, 15))
+        )
+        .addLast(stageStats ->
+            stageStats.addLastOperator(MultiStageOperator.Type.LEAF,
+                    new StatMap<>(LeafOperator.StatKey.class)
+                        .merge(LeafOperator.StatKey.EXECUTION_TIME_MS, 80)
+                        .merge(LeafOperator.StatKey.EMITTED_ROWS, 30))
+                .addLastOperator(MultiStageOperator.Type.AGGREGATE,
+                    new StatMap<>(AggregateOperator.StatKey.class)
+                        .merge(AggregateOperator.StatKey.EXECUTION_TIME_MS, 25)
+                        .merge(AggregateOperator.StatKey.EMITTED_ROWS, 10)
+                        .merge(AggregateOperator.StatKey.NUM_GROUPS, 5L))
+                .addLastOperator(MultiStageOperator.Type.MAILBOX_SEND,
+                    new StatMap<>(MailboxSendOperator.StatKey.class)
+                        .merge(MailboxSendOperator.StatKey.STAGE, 2)
+                        .merge(MailboxSendOperator.StatKey.EXECUTION_TIME_MS, 
60)
+                        .merge(MailboxSendOperator.StatKey.EMITTED_ROWS, 10))
+                .close())
+        .build();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to