This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 34a1fb71f4 [multistage] Make window operator more resilient (#13180)
34a1fb71f4 is described below
commit 34a1fb71f4d2e822c8a90948d32170289388744d
Author: Xiang Fu <[email protected]>
AuthorDate: Tue May 21 09:15:27 2024 +0800
[multistage] Make window operator more resilient (#13180)
* [multistage] Make window operator more resilient
* Adding stats
---
.../pinot/common/response/BrokerResponse.java | 5 +
.../response/broker/BrokerResponseNative.java | 6 +
.../response/broker/BrokerResponseNativeV2.java | 30 +++--
.../common/utils/config/QueryOptionsUtils.java | 13 ++
.../pinot/calcite/rel/hint/PinotHintOptions.java | 14 +++
.../planner/logical/RelToPlanNodeConverter.java | 3 +-
.../pinot/query/planner/plannode/WindowNode.java | 10 +-
.../query/runtime/operator/MultiStageOperator.java | 2 +
.../runtime/operator/WindowAggregateOperator.java | 87 +++++++++++--
.../query/runtime/plan/PhysicalPlanVisitor.java | 2 +-
.../operator/WindowAggregateOperatorTest.java | 135 +++++++++++++++++----
.../apache/pinot/spi/utils/CommonConstants.java | 14 +++
12 files changed, 279 insertions(+), 42 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 0e4a7ba192..bab22143cf 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -90,6 +90,11 @@ public interface BrokerResponse {
*/
boolean isMaxRowsInJoinReached();
+ /**
+ * Returns whether the limit for max rows in window has been reached.
+ */
+ boolean isMaxRowsInWindowReached();
+
/**
* Returns the total time used for query execution in milliseconds.
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 8b46593d44..3de27596df 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -182,6 +182,12 @@ public class BrokerResponseNative implements
BrokerResponse {
return false;
}
+ @JsonIgnore
+ @Override
+ public boolean isMaxRowsInWindowReached() {
+ return false;
+ }
+
@Override
public long getTimeUsedMs() {
return _timeUsedMs;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 232567b82a..0138b05385 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -36,16 +36,16 @@ import org.apache.pinot.common.response.ProcessingException;
* TODO: Currently this class cannot be used to deserialize the JSON response.
*/
@JsonPropertyOrder({
- "resultTable", "partialResult", "exceptions", "numGroupsLimitReached",
"maxRowsInJoinReached", "timeUsedMs",
- "stageStats", "maxRowsInOperator", "requestId", "brokerId",
"numDocsScanned", "totalDocs",
- "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numServersQueried", "numServersResponded",
- "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried",
- "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched",
"minConsumingFreshnessTimeMs",
- "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer",
"numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
- "numSegmentsPrunedByValue", "brokerReduceTimeMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
- "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
- "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
- "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo"
+ "resultTable", "partialResult", "exceptions", "numGroupsLimitReached",
"maxRowsInJoinReached",
+ "maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator",
"requestId", "brokerId",
+ "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numServersQueried",
+ "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed",
"numSegmentsMatched",
+ "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched",
+ "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid",
+ "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
"brokerReduceTimeMs", "offlineThreadCpuTimeNs",
+ "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs",
+ "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+ "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -53,6 +53,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse
{
private ResultTable _resultTable;
private boolean _maxRowsInJoinReached;
+ private boolean _maxRowsInWindowReached;
private long _timeUsedMs;
/**
* Statistics for each stage of the query execution.
@@ -121,6 +122,15 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
_maxRowsInJoinReached |= maxRowsInJoinReached;
}
+ @Override
+ public boolean isMaxRowsInWindowReached() {
+ return _maxRowsInWindowReached;
+ }
+
+ public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
+ _maxRowsInWindowReached |= maxRowsInWindowReached;
+ }
+
/**
* Returns the stage statistics.
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index efdfdef303..aaa9db3f4f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
+import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
/**
@@ -280,4 +281,16 @@ public class QueryOptionsUtils {
String joinOverflowModeStr =
queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
return joinOverflowModeStr != null ?
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
}
+
+ @Nullable
+ public static Integer getMaxRowsInWindow(Map<String, String> queryOptions) {
+ String maxRowsInWindow =
queryOptions.get(QueryOptionKey.MAX_ROWS_IN_WINDOW);
+ return maxRowsInWindow != null ? Integer.parseInt(maxRowsInWindow) : null;
+ }
+
+ @Nullable
+ public static WindowOverFlowMode getWindowOverflowMode(Map<String, String>
queryOptions) {
+ String windowOverflowModeStr =
queryOptions.get(QueryOptionKey.WINDOW_OVERFLOW_MODE);
+ return windowOverflowModeStr != null ?
WindowOverFlowMode.valueOf(windowOverflowModeStr) : null;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
index a45e92aba6..1d53a3184e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
@@ -34,6 +34,7 @@ public class PinotHintOptions {
public static final String AGGREGATE_HINT_OPTIONS = "aggOptions";
public static final String JOIN_HINT_OPTIONS = "joinOptions";
public static final String TABLE_HINT_OPTIONS = "tableOptions";
+ public static final String WINDOW_HINT_OPTIONS = "windowOptions";
/**
* Hint to denote that the aggregation node is the final aggregation stage
which extracts the final result.
@@ -68,6 +69,19 @@ public class PinotHintOptions {
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"max_initial_result_holder_capacity";
}
+ public static class WindowHintOptions {
+ /**
+ * Max rows allowed to cache the rows in window for further processing.
+ */
+ public static final String MAX_ROWS_IN_WINDOW = "max_rows_in_window";
+ /**
+ * Mode when window overflow happens, supported values: THROW or BREAK.
+ * THROW(default): Break window cache build process, and throw
exception, no further WINDOW operation performed.
+ * BREAK: Break window cache build process, continue to perform WINDOW
operation, results might be partial.
+ */
+ public static final String WINDOW_OVERFLOW_MODE = "window_overflow_mode";
+ }
+
public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY =
"dynamic_broadcast";
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index 927863809e..20c44fe114 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -152,7 +152,8 @@ public final class RelToPlanNodeConverter {
}
private static PlanNode convertLogicalWindow(LogicalWindow node, int
currentStageId) {
- return new WindowNode(currentStageId, node.groups, node.constants,
toDataSchema(node.getRowType()));
+ return new WindowNode(currentStageId, node.groups, node.constants,
toDataSchema(node.getRowType()),
+ node.getHints());
}
private static PlanNode convertLogicalSort(LogicalSort node, int
currentStageId) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
index 97a511a7d5..344adb69ab 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rex.RexLiteral;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -51,6 +52,8 @@ public class WindowNode extends AbstractPlanNode {
private List<RexExpression> _constants;
@ProtoProperties
private WindowFrameType _windowFrameType;
+ @ProtoProperties
+ private NodeHint _windowHints;
/**
* Enum to denote the type of window frame
@@ -66,7 +69,7 @@ public class WindowNode extends AbstractPlanNode {
}
public WindowNode(int planFragmentId, List<Window.Group> windowGroups,
List<RexLiteral> constants,
- DataSchema dataSchema) {
+ DataSchema dataSchema, List<RelHint> windowHints) {
super(planFragmentId, dataSchema);
// Only a single Window Group should exist per WindowNode.
Preconditions.checkState(windowGroups.size() == 1,
@@ -103,6 +106,7 @@ public class WindowNode extends AbstractPlanNode {
for (RexLiteral constant : constants) {
_constants.add(RexExpressionUtils.fromRexLiteral(constant));
}
+ _windowHints = new NodeHint(windowHints);
}
@Override
@@ -150,4 +154,8 @@ public class WindowNode extends AbstractPlanNode {
public List<RexExpression> getConstants() {
return _constants;
}
+
+ public NodeHint getWindowHints() {
+ return _windowHints;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 224923f6d5..6d7ea779cc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -283,6 +283,8 @@ public abstract class MultiStageOperator
@SuppressWarnings("unchecked")
StatMap<WindowAggregateOperator.StatKey> stats =
(StatMap<WindowAggregateOperator.StatKey>) map;
response.mergeMaxRowsInOperator(stats.getLong(WindowAggregateOperator.StatKey.EMITTED_ROWS));
+ response.mergeMaxRowsInWindowReached(
+
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
}
},;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index e2b989b76d..7d9de89b39 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -32,12 +32,17 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -46,6 +51,7 @@ import
org.apache.pinot.query.runtime.operator.utils.TypeUtils;
import org.apache.pinot.query.runtime.operator.window.WindowFunction;
import org.apache.pinot.query.runtime.operator.window.WindowFunctionFactory;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +88,8 @@ import org.slf4j.LoggerFactory;
public class WindowAggregateOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "WINDOW";
private static final Logger LOGGER =
LoggerFactory.getLogger(WindowAggregateOperator.class);
+ private static final int DEFAULT_MAX_ROWS_IN_WINDOW = 1024 * 1024; // 2^20,
around 1MM rows
+ private static final WindowOverFlowMode DEFAULT_WINDOW_OVERFLOW_MODE =
WindowOverFlowMode.THROW;
// List of window functions which can only be applied as ROWS window frame
type
public static final Set<String> ROWS_ONLY_FUNCTION_NAMES =
ImmutableSet.of("ROW_NUMBER");
@@ -99,6 +107,19 @@ public class WindowAggregateOperator extends
MultiStageOperator {
private final Map<Key, List<Object[]>> _partitionRows;
private final boolean _isPartitionByOnly;
+ // Below are specific parameters to protect the window cache from growing
too large.
+ // Once the window cache reaches the limit, we will throw exception or break
the cache build process.
+ /**
+ * Max rows allowed to build the right table hash collection.
+ */
+ private final int _maxRowsInWindowCache;
+ /**
+ * Mode when window overflow happens, supported values: THROW or BREAK.
+ * THROW(default): Break window cache build process, and throw exception, no
WINDOW operation performed.
+ * BREAK: Break window cache build process, continue to perform WINDOW
operation, results might be partial or wrong.
+ */
+ private final WindowOverFlowMode _windowOverflowMode;
+
private int _numRows;
private boolean _hasReturnedWindowAggregateBlock;
@Nullable
@@ -110,7 +131,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
List<RexExpression> groupSet, List<RexExpression> orderSet,
List<RelFieldCollation.Direction> orderSetDirection,
List<RelFieldCollation.NullDirection> orderSetNullDirection,
List<RexExpression> aggCalls, int lowerBound,
int upperBound, WindowNode.WindowFrameType windowFrameType,
List<RexExpression> constants,
- DataSchema resultSchema, DataSchema inputSchema) {
+ DataSchema resultSchema, DataSchema inputSchema,
AbstractPlanNode.NodeHint hints) {
super(context);
_inputOperator = inputOperator;
@@ -142,6 +163,9 @@ public class WindowAggregateOperator extends
MultiStageOperator {
_numRows = 0;
_hasReturnedWindowAggregateBlock = false;
+ Map<String, String> metadata = context.getOpChainMetadata();
+ _maxRowsInWindowCache = getMaxRowInWindow(metadata, hints);
+ _windowOverflowMode = getWindowOverflowMode(metadata, hints);
}
@Override
@@ -155,6 +179,36 @@ public class WindowAggregateOperator extends
MultiStageOperator {
return LOGGER;
}
+ private int getMaxRowInWindow(Map<String, String> opChainMetadata, @Nullable
AbstractPlanNode.NodeHint nodeHint) {
+ if (nodeHint != null) {
+ Map<String, String> windowOptions =
nodeHint._hintOptions.get(PinotHintOptions.WINDOW_HINT_OPTIONS);
+ if (windowOptions != null) {
+ String maxRowsInWindowStr =
windowOptions.get(PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW);
+ if (maxRowsInWindowStr != null) {
+ return Integer.parseInt(maxRowsInWindowStr);
+ }
+ }
+ }
+ Integer maxRowsInWindow =
QueryOptionsUtils.getMaxRowsInWindow(opChainMetadata);
+ return maxRowsInWindow != null ? maxRowsInWindow :
DEFAULT_MAX_ROWS_IN_WINDOW;
+ }
+
+ private WindowOverFlowMode getWindowOverflowMode(Map<String, String>
contextMetadata,
+ @Nullable AbstractPlanNode.NodeHint nodeHint) {
+ if (nodeHint != null) {
+ Map<String, String> windowOptions =
nodeHint._hintOptions.get(PinotHintOptions.WINDOW_HINT_OPTIONS);
+ if (windowOptions != null) {
+ String windowOverflowModeStr =
windowOptions.get(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE);
+ if (windowOverflowModeStr != null) {
+ return WindowOverFlowMode.valueOf(windowOverflowModeStr);
+ }
+ }
+ }
+ WindowOverFlowMode windowOverflowMode =
+ QueryOptionsUtils.getWindowOverflowMode(contextMetadata);
+ return windowOverflowMode != null ? windowOverflowMode :
DEFAULT_WINDOW_OVERFLOW_MODE;
+ }
+
@Override
public List<MultiStageOperator> getChildOperators() {
return ImmutableList.of(_inputOperator);
@@ -172,7 +226,8 @@ public class WindowAggregateOperator extends
MultiStageOperator {
}
@Override
- protected TransferableBlock getNextBlock() {
+ protected TransferableBlock getNextBlock()
+ throws ProcessingException {
if (_hasReturnedWindowAggregateBlock) {
return _eosBlock;
}
@@ -213,16 +268,34 @@ public class WindowAggregateOperator extends
MultiStageOperator {
/**
* @return the final block, which must be either an end of stream or an
error.
*/
- private TransferableBlock computeBlocks() {
+ private TransferableBlock computeBlocks() throws ProcessingException {
TransferableBlock block = _inputOperator.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(block)) {
List<Object[]> container = block.getContainer();
+ int containerSize = container.size();
+ if (_numRows + containerSize > _maxRowsInWindowCache) {
+ if (_windowOverflowMode == WindowOverFlowMode.THROW) {
+ ProcessingException resourceLimitExceededException =
+ new
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+ resourceLimitExceededException.setMessage(
+ "Cannot build in memory window cache for WINDOW operator, reach
number of rows limit: "
+ + _maxRowsInWindowCache);
+ throw resourceLimitExceededException;
+ } else {
+ // Just fill up the buffer.
+ int remainingRows = _maxRowsInWindowCache - _numRows;
+ container = container.subList(0, remainingRows);
+ _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW_REACHED, true);
+ // setting the inputOperator to be early terminated and awaits EOS
block next.
+ _inputOperator.earlyTerminate();
+ }
+ }
for (Object[] row : container) {
- _numRows++;
// TODO: Revisit null direction handling for all query types
Key key = AggregationUtils.extractRowKey(row, _groupSet);
_partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
}
+ _numRows += containerSize;
block = _inputOperator.nextBlock();
}
// Early termination if the block is an error block
@@ -240,8 +313,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
List<List<Object>> windowFunctionResults = new ArrayList<>();
for (WindowFunction windowFunction : _windowFunctions) {
List<Object> processRows = windowFunction.processRows(rowList);
- Preconditions.checkState(processRows.size() == rowList.size(),
- "Number of rows in the result set must match the number of rows in
the input set");
+ assert processRows.size() == rowList.size();
windowFunctionResults.add(processRows);
}
@@ -359,7 +431,8 @@ public class WindowAggregateOperator extends
MultiStageOperator {
public boolean includeDefaultInJson() {
return true;
}
- };
+ },
+ MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN);
private final StatMap.Type _type;
StatKey(StatMap.Type type) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index ab318833cd..5006a92c74 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -113,7 +113,7 @@ public class PhysicalPlanVisitor implements
PlanNodeVisitor<MultiStageOperator,
return new WindowAggregateOperator(context, nextOperator,
node.getGroupSet(), node.getOrderSet(),
node.getOrderSetDirection(), node.getOrderSetNullDirection(),
node.getAggCalls(), node.getLowerBound(),
node.getUpperBound(), node.getWindowFrameType(), node.getConstants(),
node.getDataSchema(),
- node.getInputs().get(0).getDataSchema());
+ node.getInputs().get(0).getDataSchema(), node.getWindowHints());
}
@Override
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index 61df71d9ad..56761b2294 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -26,10 +27,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -85,7 +92,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block1 = operator.nextBlock(); // build
@@ -108,7 +116,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block = operator.nextBlock();
@@ -132,7 +141,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -161,7 +171,7 @@ public class WindowAggregateOperatorTest {
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, order,
Arrays.asList(RelFieldCollation.Direction.ASCENDING),
Arrays.asList(RelFieldCollation.NullDirection.LAST),
calls, Integer.MIN_VALUE, Integer.MAX_VALUE,
WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
- outSchema, inSchema);
+ outSchema, inSchema, getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -187,7 +197,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), calls, Integer.MIN_VALUE,
- Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE,
Collections.emptyList(), outSchema, inSchema);
+ Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE,
Collections.emptyList(), outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -214,7 +225,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -229,7 +241,8 @@ public class WindowAggregateOperatorTest {
}
@Test
- public void testPartitionByWindowAggregateWithHashCollision() {
+ public void testPartitionByWindowAggregateWithHashCollision()
+ throws ProcessingException {
MultiStageOperator upstreamOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
// Create an aggregation call with sum for first column and group by
second column.
List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(0)));
@@ -240,7 +253,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator sum0PartitionBy1 =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
upstreamOperator, group,
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), calls, Integer.MIN_VALUE,
- Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE,
Collections.emptyList(), outSchema, inSchema);
+ Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE,
Collections.emptyList(), outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
TransferableBlock result = sum0PartitionBy1.getNextBlock();
List<Object[]> resultRows = result.getContainer();
@@ -266,7 +280,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
}
@Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = ".*Failed to instantiate "
@@ -284,11 +299,13 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
}
@Test
- public void testRankDenseRankRankingFunctions() {
+ public void testRankDenseRankRankingFunctions()
+ throws ProcessingException {
// Given:
List<RexExpression> calls =
ImmutableList.of(new RexExpression.FunctionCall(SqlKind.RANK,
ColumnDataType.INT, "RANK", ImmutableList.of()),
@@ -312,7 +329,7 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, order, Collections.emptyList(),
Collections.emptyList(), calls, Integer.MIN_VALUE, 0,
WindowNode.WindowFrameType.RANGE,
- Collections.emptyList(), outSchema, inSchema);
+ Collections.emptyList(), outSchema, inSchema,
getWindowHints(ImmutableMap.of()));
TransferableBlock result = operator.getNextBlock();
TransferableBlock eosBlock = operator.getNextBlock();
@@ -347,7 +364,8 @@ public class WindowAggregateOperatorTest {
}
@Test
- public void testRowNumberRankingFunction() {
+ public void testRowNumberRankingFunction()
+ throws ProcessingException {
// Given:
List<RexExpression> calls = ImmutableList.of(
new RexExpression.FunctionCall(SqlKind.ROW_NUMBER, ColumnDataType.INT,
"ROW_NUMBER", ImmutableList.of()));
@@ -369,7 +387,7 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, order, Collections.emptyList(),
Collections.emptyList(), calls, Integer.MIN_VALUE, 0,
WindowNode.WindowFrameType.ROWS,
- Collections.emptyList(), outSchema, inSchema);
+ Collections.emptyList(), outSchema, inSchema,
getWindowHints(ImmutableMap.of()));
TransferableBlock result = operator.getNextBlock();
TransferableBlock eosBlock = operator.getNextBlock();
@@ -403,7 +421,8 @@ public class WindowAggregateOperatorTest {
}
@Test
- public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
+ public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys()
+ throws ProcessingException {
// Given:
List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(0)));
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
@@ -423,7 +442,7 @@ public class WindowAggregateOperatorTest {
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, order,
Arrays.asList(RelFieldCollation.Direction.ASCENDING),
Arrays.asList(RelFieldCollation.NullDirection.LAST),
calls, Integer.MIN_VALUE, Integer.MAX_VALUE,
WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
- outSchema, inSchema);
+ outSchema, inSchema, getWindowHints(ImmutableMap.of()));
TransferableBlock result = operator.getNextBlock();
TransferableBlock eosBlock = operator.getNextBlock();
@@ -463,7 +482,7 @@ public class WindowAggregateOperatorTest {
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, order,
Arrays.asList(RelFieldCollation.Direction.DESCENDING),
Arrays.asList(RelFieldCollation.NullDirection.LAST),
calls, Integer.MIN_VALUE, Integer.MAX_VALUE,
WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
- outSchema, inSchema);
+ outSchema, inSchema, getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock resultBlock = operator.nextBlock(); // (output result)
@@ -494,7 +513,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.ROWS, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.ROWS, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
}
@Test
@@ -514,7 +534,7 @@ public class WindowAggregateOperatorTest {
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, order,
Arrays.asList(RelFieldCollation.Direction.ASCENDING),
Arrays.asList(RelFieldCollation.NullDirection.LAST),
calls, Integer.MIN_VALUE, 0, WindowNode.WindowFrameType.RANGE,
Collections.emptyList(), outSchema,
- inSchema);
+ inSchema, getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -544,7 +564,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls, 5,
Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
}
@Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = "Only default frame is "
@@ -564,7 +585,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, 5,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
}
@Test
@@ -585,7 +607,8 @@ public class WindowAggregateOperatorTest {
WindowAggregateOperator operator =
new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
- WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(ImmutableMap.of()));
// When:
TransferableBlock block = operator.nextBlock();
@@ -596,7 +619,75 @@ public class WindowAggregateOperatorTest {
"expected it to fail with class cast exception");
}
+ @Test
+ public void testShouldPropagateWindowLimitError() {
+ // Given:
+ List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(1)));
+ List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
+ Map<String, String> hintsMap =
ImmutableMap.of(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE,
"THROW",
+ PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW, "1");
+
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
+ Mockito.when(_input.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}, new
Object[]{3, 4}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+
+ DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"},
new ColumnDataType[]{INT, INT, DOUBLE});
+ WindowAggregateOperator operator =
+ new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(hintsMap));
+
+ // When:
+ TransferableBlock block = operator.nextBlock();
+
+ // Then:
+ Assert.assertTrue(block.isErrorBlock(), "expected ERROR block from window
overflow");
+
Assert.assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+ .contains("reach number of rows limit"));
+ }
+
+ @Test
+ public void testShouldHandleWindowWithPartialResultsWhenHitDataRowsLimit() {
+ // Given:
+ List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(1)));
+ List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
+ Map<String, String> hintsMap =
ImmutableMap.of(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE,
"BREAK",
+ PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW, "1");
+
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
+ Mockito.when(_input.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}, new
Object[]{3, 4}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+
+ DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"},
new ColumnDataType[]{INT, INT, DOUBLE});
+ WindowAggregateOperator operator =
+ new WindowAggregateOperator(OperatorTestUtil.getTracingContext(),
_input, group, Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList(), calls,
Integer.MIN_VALUE, Integer.MAX_VALUE,
+ WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema,
+ getWindowHints(hintsMap));
+
+ // When:
+ TransferableBlock firstBlock = operator.nextBlock();
+ Mockito.verify(_input).earlyTerminate();
+ Assert.assertTrue(firstBlock.isDataBlock(), "First block should be a data
block but is " + firstBlock.getClass());
+ Assert.assertEquals(firstBlock.getNumRows(), 1);
+
+ TransferableBlock secondBlock = operator.nextBlock();
+ StatMap<WindowAggregateOperator.StatKey> windowStats =
+ OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class,
secondBlock);
+
Assert.assertTrue(windowStats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED),
+ "Max rows in window should be reached");
+ }
+
private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(SqlKind.SUM, ColumnDataType.INT,
"SUM", ImmutableList.of(arg));
}
+
+ private static AbstractPlanNode.NodeHint getWindowHints(Map<String, String>
hintsMap) {
+ RelHint.Builder relHintBuilder =
RelHint.builder(PinotHintOptions.WINDOW_HINT_OPTIONS);
+ hintsMap.forEach(relHintBuilder::hintOption);
+ return new
AbstractPlanNode.NodeHint(ImmutableList.of(relHintBuilder.build()));
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7a05d488da..c02423d4b6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -390,6 +390,10 @@ public class CommonConstants {
public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
+ // Handle WINDOW Overflow
+ public static final String MAX_ROWS_IN_WINDOW = "maxRowsInWindow";
+ public static final String WINDOW_OVERFLOW_MODE = "windowOverflowMode";
+
// Indicates the maximum length of the serialized response per server
for a query.
public static final String MAX_SERVER_RESPONSE_SIZE_BYTES =
"maxServerResponseSizeBytes";
@@ -1120,6 +1124,16 @@ public class CommonConstants {
public enum JoinOverFlowMode {
THROW, BREAK
}
+
+ /**
+ * Configuration for window overflow.
+ */
+ public static final String KEY_OF_MAX_ROWS_IN_WINDOW =
"pinot.query.window.max.rows";
+ public static final String KEY_OF_WINDOW_OVERFLOW_MODE =
"pinot.query.window.overflow.mode";
+
+ public enum WindowOverFlowMode {
+ THROW, BREAK
+ }
}
public static class NullValuePlaceHolder {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]