This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a1fb71f4 [multistage] Make window operator more resilient (#13180)
34a1fb71f4 is described below

commit 34a1fb71f4d2e822c8a90948d32170289388744d
Author: Xiang Fu <[email protected]>
AuthorDate: Tue May 21 09:15:27 2024 +0800

    [multistage] Make window operator more resilient (#13180)
    
    * [multistage] Make window operator more resilient
    
    * Adding stats
---
 .../pinot/common/response/BrokerResponse.java      |   5 +
 .../response/broker/BrokerResponseNative.java      |   6 +
 .../response/broker/BrokerResponseNativeV2.java    |  30 +++--
 .../common/utils/config/QueryOptionsUtils.java     |  13 ++
 .../pinot/calcite/rel/hint/PinotHintOptions.java   |  14 +++
 .../planner/logical/RelToPlanNodeConverter.java    |   3 +-
 .../pinot/query/planner/plannode/WindowNode.java   |  10 +-
 .../query/runtime/operator/MultiStageOperator.java |   2 +
 .../runtime/operator/WindowAggregateOperator.java  |  87 +++++++++++--
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   2 +-
 .../operator/WindowAggregateOperatorTest.java      | 135 +++++++++++++++++----
 .../apache/pinot/spi/utils/CommonConstants.java    |  14 +++
 12 files changed, 279 insertions(+), 42 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 0e4a7ba192..bab22143cf 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -90,6 +90,11 @@ public interface BrokerResponse {
    */
   boolean isMaxRowsInJoinReached();
 
+  /**
+   * Returns whether the limit for max rows in window has been reached.
+   */
+  boolean isMaxRowsInWindowReached();
+
   /**
    * Returns the total time used for query execution in milliseconds.
    */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 8b46593d44..3de27596df 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -182,6 +182,12 @@ public class BrokerResponseNative implements 
BrokerResponse {
     return false;
   }
 
+  @JsonIgnore
+  @Override
+  public boolean isMaxRowsInWindowReached() {
+    return false;
+  }
+
   @Override
   public long getTimeUsedMs() {
     return _timeUsedMs;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 232567b82a..0138b05385 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -36,16 +36,16 @@ import org.apache.pinot.common.response.ProcessingException;
  * TODO: Currently this class cannot be used to deserialize the JSON response.
  */
 @JsonPropertyOrder({
-    "resultTable", "partialResult", "exceptions", "numGroupsLimitReached", 
"maxRowsInJoinReached", "timeUsedMs",
-    "stageStats", "maxRowsInOperator", "requestId", "brokerId", 
"numDocsScanned", "totalDocs",
-    "numEntriesScannedInFilter", "numEntriesScannedPostFilter", 
"numServersQueried", "numServersResponded",
-    "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried",
-    "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", 
"minConsumingFreshnessTimeMs",
-    "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", 
"numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
-    "numSegmentsPrunedByValue", "brokerReduceTimeMs", 
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
-    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
-    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
-    "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo"
+    "resultTable", "partialResult", "exceptions", "numGroupsLimitReached", 
"maxRowsInJoinReached",
+    "maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator", 
"requestId", "brokerId",
+    "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", 
"numEntriesScannedPostFilter", "numServersQueried",
+    "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", 
"numSegmentsMatched",
+    "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", 
"numConsumingSegmentsMatched",
+    "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", 
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid",
+    "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", 
"brokerReduceTimeMs", "offlineThreadCpuTimeNs",
+    "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", 
"realtimeSystemActivitiesCpuTimeNs",
+    "offlineResponseSerializationCpuTimeNs", 
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+    "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo"
 })
 public class BrokerResponseNativeV2 implements BrokerResponse {
   private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -53,6 +53,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse 
{
 
   private ResultTable _resultTable;
   private boolean _maxRowsInJoinReached;
+  private boolean _maxRowsInWindowReached;
   private long _timeUsedMs;
   /**
    * Statistics for each stage of the query execution.
@@ -121,6 +122,15 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     _maxRowsInJoinReached |= maxRowsInJoinReached;
   }
 
+  @Override
+  public boolean isMaxRowsInWindowReached() {
+    return _maxRowsInWindowReached;
+  }
+
+  public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
+    _maxRowsInWindowReached |= maxRowsInWindowReached;
+  }
+
   /**
    * Returns the stage statistics.
    */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index efdfdef303..aaa9db3f4f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
+import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
 
 
 /**
@@ -280,4 +281,16 @@ public class QueryOptionsUtils {
     String joinOverflowModeStr = 
queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
     return joinOverflowModeStr != null ? 
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
   }
+
+  @Nullable
+  public static Integer getMaxRowsInWindow(Map<String, String> queryOptions) {
+    String maxRowsInWindow = 
queryOptions.get(QueryOptionKey.MAX_ROWS_IN_WINDOW);
+    return maxRowsInWindow != null ? Integer.parseInt(maxRowsInWindow) : null;
+  }
+
+  @Nullable
+  public static WindowOverFlowMode getWindowOverflowMode(Map<String, String> 
queryOptions) {
+    String windowOverflowModeStr = 
queryOptions.get(QueryOptionKey.WINDOW_OVERFLOW_MODE);
+    return windowOverflowModeStr != null ? 
WindowOverFlowMode.valueOf(windowOverflowModeStr) : null;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
index a45e92aba6..1d53a3184e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
@@ -34,6 +34,7 @@ public class PinotHintOptions {
   public static final String AGGREGATE_HINT_OPTIONS = "aggOptions";
   public static final String JOIN_HINT_OPTIONS = "joinOptions";
   public static final String TABLE_HINT_OPTIONS = "tableOptions";
+  public static final String WINDOW_HINT_OPTIONS = "windowOptions";
 
   /**
    * Hint to denote that the aggregation node is the final aggregation stage 
which extracts the final result.
@@ -68,6 +69,19 @@ public class PinotHintOptions {
     public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"max_initial_result_holder_capacity";
   }
 
+  public static class WindowHintOptions {
+    /**
+     * Max rows allowed to cache the rows in window for further processing.
+     */
+    public static final String MAX_ROWS_IN_WINDOW = "max_rows_in_window";
+    /**
+     * Mode when window overflow happens, supported values: THROW or BREAK.
+     *   THROW(default): Break window cache build process, and throw 
exception, no further WINDOW operation performed.
+     *   BREAK: Break window cache build process, continue to perform WINDOW 
operation, results might be partial.
+     */
+    public static final String WINDOW_OVERFLOW_MODE = "window_overflow_mode";
+  }
+
   public static class JoinHintOptions {
     public static final String JOIN_STRATEGY = "join_strategy";
     public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = 
"dynamic_broadcast";
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index 927863809e..20c44fe114 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -152,7 +152,8 @@ public final class RelToPlanNodeConverter {
   }
 
   private static PlanNode convertLogicalWindow(LogicalWindow node, int 
currentStageId) {
-    return new WindowNode(currentStageId, node.groups, node.constants, 
toDataSchema(node.getRowType()));
+    return new WindowNode(currentStageId, node.groups, node.constants, 
toDataSchema(node.getRowType()),
+        node.getHints());
   }
 
   private static PlanNode convertLogicalSort(LogicalSort node, int 
currentStageId) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
index 97a511a7d5..344adb69ab 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -51,6 +52,8 @@ public class WindowNode extends AbstractPlanNode {
   private List<RexExpression> _constants;
   @ProtoProperties
   private WindowFrameType _windowFrameType;
+  @ProtoProperties
+  private NodeHint _windowHints;
 
   /**
    * Enum to denote the type of window frame
@@ -66,7 +69,7 @@ public class WindowNode extends AbstractPlanNode {
   }
 
   public WindowNode(int planFragmentId, List<Window.Group> windowGroups, 
List<RexLiteral> constants,
-      DataSchema dataSchema) {
+      DataSchema dataSchema, List<RelHint> windowHints) {
     super(planFragmentId, dataSchema);
     // Only a single Window Group should exist per WindowNode.
     Preconditions.checkState(windowGroups.size() == 1,
@@ -103,6 +106,7 @@ public class WindowNode extends AbstractPlanNode {
     for (RexLiteral constant : constants) {
       _constants.add(RexExpressionUtils.fromRexLiteral(constant));
     }
+    _windowHints = new NodeHint(windowHints);
   }
 
   @Override
@@ -150,4 +154,8 @@ public class WindowNode extends AbstractPlanNode {
   public List<RexExpression> getConstants() {
     return _constants;
   }
+
+  public NodeHint getWindowHints() {
+    return _windowHints;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 224923f6d5..6d7ea779cc 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -283,6 +283,8 @@ public abstract class MultiStageOperator
         @SuppressWarnings("unchecked")
         StatMap<WindowAggregateOperator.StatKey> stats = 
(StatMap<WindowAggregateOperator.StatKey>) map;
         
response.mergeMaxRowsInOperator(stats.getLong(WindowAggregateOperator.StatKey.EMITTED_ROWS));
+        response.mergeMaxRowsInWindowReached(
+            
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
       }
     },;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index e2b989b76d..7d9de89b39 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -32,12 +32,17 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -46,6 +51,7 @@ import 
org.apache.pinot.query.runtime.operator.utils.TypeUtils;
 import org.apache.pinot.query.runtime.operator.window.WindowFunction;
 import org.apache.pinot.query.runtime.operator.window.WindowFunctionFactory;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +88,8 @@ import org.slf4j.LoggerFactory;
 public class WindowAggregateOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "WINDOW";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WindowAggregateOperator.class);
+  private static final int DEFAULT_MAX_ROWS_IN_WINDOW = 1024 * 1024; // 2^20, 
around 1MM rows
+  private static final WindowOverFlowMode DEFAULT_WINDOW_OVERFLOW_MODE = 
WindowOverFlowMode.THROW;
 
   // List of window functions which can only be applied as ROWS window frame 
type
   public static final Set<String> ROWS_ONLY_FUNCTION_NAMES = 
ImmutableSet.of("ROW_NUMBER");
@@ -99,6 +107,19 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   private final Map<Key, List<Object[]>> _partitionRows;
   private final boolean _isPartitionByOnly;
 
+  // Below are specific parameters to protect the window cache from growing 
too large.
+  // Once the window cache reaches the limit, we will throw exception or break 
the cache build process.
+  /**
+   * Max rows allowed to build the right table hash collection.
+   */
+  private final int _maxRowsInWindowCache;
+  /**
+   * Mode when window overflow happens, supported values: THROW or BREAK.
+   * THROW(default): Break window cache build process, and throw exception, no 
WINDOW operation performed.
+   * BREAK: Break window cache build process, continue to perform WINDOW 
operation, results might be partial or wrong.
+   */
+  private final WindowOverFlowMode _windowOverflowMode;
+
   private int _numRows;
   private boolean _hasReturnedWindowAggregateBlock;
   @Nullable
@@ -110,7 +131,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       List<RexExpression> groupSet, List<RexExpression> orderSet, 
List<RelFieldCollation.Direction> orderSetDirection,
       List<RelFieldCollation.NullDirection> orderSetNullDirection, 
List<RexExpression> aggCalls, int lowerBound,
       int upperBound, WindowNode.WindowFrameType windowFrameType, 
List<RexExpression> constants,
-      DataSchema resultSchema, DataSchema inputSchema) {
+      DataSchema resultSchema, DataSchema inputSchema, 
AbstractPlanNode.NodeHint hints) {
     super(context);
 
     _inputOperator = inputOperator;
@@ -142,6 +163,9 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
 
     _numRows = 0;
     _hasReturnedWindowAggregateBlock = false;
+    Map<String, String> metadata = context.getOpChainMetadata();
+    _maxRowsInWindowCache = getMaxRowInWindow(metadata, hints);
+    _windowOverflowMode = getWindowOverflowMode(metadata, hints);
   }
 
   @Override
@@ -155,6 +179,36 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     return LOGGER;
   }
 
+  private int getMaxRowInWindow(Map<String, String> opChainMetadata, @Nullable 
AbstractPlanNode.NodeHint nodeHint) {
+    if (nodeHint != null) {
+      Map<String, String> windowOptions = 
nodeHint._hintOptions.get(PinotHintOptions.WINDOW_HINT_OPTIONS);
+      if (windowOptions != null) {
+        String maxRowsInWindowStr = 
windowOptions.get(PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW);
+        if (maxRowsInWindowStr != null) {
+          return Integer.parseInt(maxRowsInWindowStr);
+        }
+      }
+    }
+    Integer maxRowsInWindow = 
QueryOptionsUtils.getMaxRowsInWindow(opChainMetadata);
+    return maxRowsInWindow != null ? maxRowsInWindow : 
DEFAULT_MAX_ROWS_IN_WINDOW;
+  }
+
+  private WindowOverFlowMode getWindowOverflowMode(Map<String, String> 
contextMetadata,
+      @Nullable AbstractPlanNode.NodeHint nodeHint) {
+    if (nodeHint != null) {
+      Map<String, String> windowOptions = 
nodeHint._hintOptions.get(PinotHintOptions.WINDOW_HINT_OPTIONS);
+      if (windowOptions != null) {
+        String windowOverflowModeStr = 
windowOptions.get(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE);
+        if (windowOverflowModeStr != null) {
+          return WindowOverFlowMode.valueOf(windowOverflowModeStr);
+        }
+      }
+    }
+    WindowOverFlowMode windowOverflowMode =
+        QueryOptionsUtils.getWindowOverflowMode(contextMetadata);
+    return windowOverflowMode != null ? windowOverflowMode : 
DEFAULT_WINDOW_OVERFLOW_MODE;
+  }
+
   @Override
   public List<MultiStageOperator> getChildOperators() {
     return ImmutableList.of(_inputOperator);
@@ -172,7 +226,8 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   }
 
   @Override
-  protected TransferableBlock getNextBlock() {
+  protected TransferableBlock getNextBlock()
+      throws ProcessingException {
     if (_hasReturnedWindowAggregateBlock) {
       return _eosBlock;
     }
@@ -213,16 +268,34 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   /**
    * @return the final block, which must be either an end of stream or an 
error.
    */
-  private TransferableBlock computeBlocks() {
+  private TransferableBlock computeBlocks() throws ProcessingException {
     TransferableBlock block = _inputOperator.nextBlock();
     while (!TransferableBlockUtils.isEndOfStream(block)) {
       List<Object[]> container = block.getContainer();
+      int containerSize = container.size();
+      if (_numRows + containerSize > _maxRowsInWindowCache) {
+        if (_windowOverflowMode == WindowOverFlowMode.THROW) {
+          ProcessingException resourceLimitExceededException =
+              new 
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+          resourceLimitExceededException.setMessage(
+              "Cannot build in memory window cache for WINDOW operator, reach 
number of rows limit: "
+                  + _maxRowsInWindowCache);
+          throw resourceLimitExceededException;
+        } else {
+          // Just fill up the buffer.
+          int remainingRows = _maxRowsInWindowCache - _numRows;
+          container = container.subList(0, remainingRows);
+          _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW_REACHED, true);
+          // setting the inputOperator to be early terminated and awaits EOS 
block next.
+          _inputOperator.earlyTerminate();
+        }
+      }
       for (Object[] row : container) {
-        _numRows++;
         // TODO: Revisit null direction handling for all query types
         Key key = AggregationUtils.extractRowKey(row, _groupSet);
         _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
       }
+      _numRows += containerSize;
       block = _inputOperator.nextBlock();
     }
     // Early termination if the block is an error block
@@ -240,8 +313,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       List<List<Object>> windowFunctionResults = new ArrayList<>();
       for (WindowFunction windowFunction : _windowFunctions) {
         List<Object> processRows = windowFunction.processRows(rowList);
-        Preconditions.checkState(processRows.size() == rowList.size(),
-            "Number of rows in the result set must match the number of rows in 
the input set");
+        assert processRows.size() == rowList.size();
         windowFunctionResults.add(processRows);
       }
 
@@ -359,7 +431,8 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       public boolean includeDefaultInJson() {
         return true;
       }
-    };
+    },
+    MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN);
     private final StatMap.Type _type;
 
     StatKey(StatMap.Type type) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index ab318833cd..5006a92c74 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -113,7 +113,7 @@ public class PhysicalPlanVisitor implements 
PlanNodeVisitor<MultiStageOperator,
     return new WindowAggregateOperator(context, nextOperator, 
node.getGroupSet(), node.getOrderSet(),
         node.getOrderSetDirection(), node.getOrderSetNullDirection(), 
node.getAggCalls(), node.getLowerBound(),
         node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), 
node.getDataSchema(),
-        node.getInputs().get(0).getDataSchema());
+        node.getInputs().get(0).getDataSchema(), node.getWindowHints());
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index 61df71d9ad..56761b2294 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -26,10 +27,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -85,7 +92,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build
@@ -108,7 +116,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -132,7 +141,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -161,7 +171,7 @@ public class WindowAggregateOperatorTest {
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, order,
             Arrays.asList(RelFieldCollation.Direction.ASCENDING), 
Arrays.asList(RelFieldCollation.NullDirection.LAST),
             calls, Integer.MIN_VALUE, Integer.MAX_VALUE, 
WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
-            outSchema, inSchema);
+            outSchema, inSchema, getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -187,7 +197,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), calls, Integer.MIN_VALUE,
-            Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, 
Collections.emptyList(), outSchema, inSchema);
+            Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, 
Collections.emptyList(), outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -214,7 +225,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -229,7 +241,8 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test
-  public void testPartitionByWindowAggregateWithHashCollision() {
+  public void testPartitionByWindowAggregateWithHashCollision()
+      throws ProcessingException {
     MultiStageOperator upstreamOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
     // Create an aggregation call with sum for first column and group by 
second column.
     List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(0)));
@@ -240,7 +253,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator sum0PartitionBy1 =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
upstreamOperator, group,
             Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), calls, Integer.MIN_VALUE,
-            Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, 
Collections.emptyList(), outSchema, inSchema);
+            Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, 
Collections.emptyList(), outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     TransferableBlock result = sum0PartitionBy1.getNextBlock();
     List<Object[]> resultRows = result.getContainer();
@@ -266,7 +280,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
   }
 
   @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = ".*Failed to instantiate "
@@ -284,11 +299,13 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
   }
 
   @Test
-  public void testRankDenseRankRankingFunctions() {
+  public void testRankDenseRankRankingFunctions()
+      throws ProcessingException {
     // Given:
     List<RexExpression> calls =
         ImmutableList.of(new RexExpression.FunctionCall(SqlKind.RANK, 
ColumnDataType.INT, "RANK", ImmutableList.of()),
@@ -312,7 +329,7 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, order, Collections.emptyList(),
             Collections.emptyList(), calls, Integer.MIN_VALUE, 0, 
WindowNode.WindowFrameType.RANGE,
-            Collections.emptyList(), outSchema, inSchema);
+            Collections.emptyList(), outSchema, inSchema, 
getWindowHints(ImmutableMap.of()));
 
     TransferableBlock result = operator.getNextBlock();
     TransferableBlock eosBlock = operator.getNextBlock();
@@ -347,7 +364,8 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test
-  public void testRowNumberRankingFunction() {
+  public void testRowNumberRankingFunction()
+      throws ProcessingException {
     // Given:
     List<RexExpression> calls = ImmutableList.of(
         new RexExpression.FunctionCall(SqlKind.ROW_NUMBER, ColumnDataType.INT, 
"ROW_NUMBER", ImmutableList.of()));
@@ -369,7 +387,7 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, order, Collections.emptyList(),
             Collections.emptyList(), calls, Integer.MIN_VALUE, 0, 
WindowNode.WindowFrameType.ROWS,
-            Collections.emptyList(), outSchema, inSchema);
+            Collections.emptyList(), outSchema, inSchema, 
getWindowHints(ImmutableMap.of()));
 
     TransferableBlock result = operator.getNextBlock();
     TransferableBlock eosBlock = operator.getNextBlock();
@@ -403,7 +421,8 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test
-  public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
+  public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys()
+      throws ProcessingException {
     // Given:
     List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(0)));
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
@@ -423,7 +442,7 @@ public class WindowAggregateOperatorTest {
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, order,
             Arrays.asList(RelFieldCollation.Direction.ASCENDING), 
Arrays.asList(RelFieldCollation.NullDirection.LAST),
             calls, Integer.MIN_VALUE, Integer.MAX_VALUE, 
WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
-            outSchema, inSchema);
+            outSchema, inSchema, getWindowHints(ImmutableMap.of()));
 
     TransferableBlock result = operator.getNextBlock();
     TransferableBlock eosBlock = operator.getNextBlock();
@@ -463,7 +482,7 @@ public class WindowAggregateOperatorTest {
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, order,
             Arrays.asList(RelFieldCollation.Direction.DESCENDING), 
Arrays.asList(RelFieldCollation.NullDirection.LAST),
             calls, Integer.MIN_VALUE, Integer.MAX_VALUE, 
WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
-            outSchema, inSchema);
+            outSchema, inSchema, getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock(); // (output result)
@@ -494,7 +513,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.ROWS, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.ROWS, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
   }
 
   @Test
@@ -514,7 +534,7 @@ public class WindowAggregateOperatorTest {
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, order,
             Arrays.asList(RelFieldCollation.Direction.ASCENDING), 
Arrays.asList(RelFieldCollation.NullDirection.LAST),
             calls, Integer.MIN_VALUE, 0, WindowNode.WindowFrameType.RANGE, 
Collections.emptyList(), outSchema,
-            inSchema);
+            inSchema, getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -544,7 +564,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 5, 
Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
   }
 
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = "Only default frame is "
@@ -564,7 +585,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, 5,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
   }
 
   @Test
@@ -585,7 +607,8 @@ public class WindowAggregateOperatorTest {
     WindowAggregateOperator operator =
         new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
-            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(ImmutableMap.of()));
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -596,7 +619,75 @@ public class WindowAggregateOperatorTest {
         "expected it to fail with class cast exception");
   }
 
+  @Test
+  public void testShouldPropagateWindowLimitError() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+    Map<String, String> hintsMap = 
ImmutableMap.of(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE, 
"THROW",
+        PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW, "1");
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}, new 
Object[]{3, 4}))
+        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+
+    DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, 
new ColumnDataType[]{INT, INT, DOUBLE});
+    WindowAggregateOperator operator =
+        new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
+            Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(hintsMap));
+
+    // When:
+    TransferableBlock block = operator.nextBlock();
+
+    // Then:
+    Assert.assertTrue(block.isErrorBlock(), "expected ERROR block from window 
overflow");
+    
Assert.assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+        .contains("reach number of rows limit"));
+  }
+
+  @Test
+  public void testShouldHandleWindowWithPartialResultsWhenHitDataRowsLimit() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+    Map<String, String> hintsMap = 
ImmutableMap.of(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE, 
"BREAK",
+        PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW, "1");
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}, new 
Object[]{3, 4}))
+        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+
+    DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, 
new ColumnDataType[]{INT, INT, DOUBLE});
+    WindowAggregateOperator operator =
+        new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, group, Collections.emptyList(),
+            Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
+            WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema,
+            getWindowHints(hintsMap));
+
+    // When:
+    TransferableBlock firstBlock = operator.nextBlock();
+    Mockito.verify(_input).earlyTerminate();
+    Assert.assertTrue(firstBlock.isDataBlock(), "First block should be a data 
block but is " + firstBlock.getClass());
+    Assert.assertEquals(firstBlock.getNumRows(), 1);
+
+    TransferableBlock secondBlock = operator.nextBlock();
+    StatMap<WindowAggregateOperator.StatKey> windowStats =
+        OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class, 
secondBlock);
+    
Assert.assertTrue(windowStats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED),
+        "Max rows in window should be reached");
+  }
+
   private static RexExpression.FunctionCall getSum(RexExpression arg) {
     return new RexExpression.FunctionCall(SqlKind.SUM, ColumnDataType.INT, 
"SUM", ImmutableList.of(arg));
   }
+
+  private static AbstractPlanNode.NodeHint getWindowHints(Map<String, String> 
hintsMap) {
+    RelHint.Builder relHintBuilder = 
RelHint.builder(PinotHintOptions.WINDOW_HINT_OPTIONS);
+    hintsMap.forEach(relHintBuilder::hintOption);
+    return new 
AbstractPlanNode.NodeHint(ImmutableList.of(relHintBuilder.build()));
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7a05d488da..c02423d4b6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -390,6 +390,10 @@ public class CommonConstants {
         public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
         public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
 
+        // Handle WINDOW Overflow
+        public static final String MAX_ROWS_IN_WINDOW = "maxRowsInWindow";
+        public static final String WINDOW_OVERFLOW_MODE = "windowOverflowMode";
+
         // Indicates the maximum length of the serialized response per server 
for a query.
         public static final String MAX_SERVER_RESPONSE_SIZE_BYTES = 
"maxServerResponseSizeBytes";
 
@@ -1120,6 +1124,16 @@ public class CommonConstants {
     public enum JoinOverFlowMode {
       THROW, BREAK
     }
+
+    /**
+     * Configuration for window overflow.
+     */
+    public static final String KEY_OF_MAX_ROWS_IN_WINDOW = 
"pinot.query.window.max.rows";
+    public static final String KEY_OF_WINDOW_OVERFLOW_MODE = 
"pinot.query.window.overflow.mode";
+
+    public enum WindowOverFlowMode {
+      THROW, BREAK
+    }
   }
 
   public static class NullValuePlaceHolder {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to