This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a7ca904f4 [multistage][bugfix] order by limit is capped at 10_000
(#10809)
6a7ca904f4 is described below
commit 6a7ca904f497c75d91ae0700ae551796f4b0606f
Author: Rong Rong <[email protected]>
AuthorDate: Tue May 30 14:29:39 2023 -0700
[multistage][bugfix] order by limit is capped at 10_000 (#10809)
---
.../apache/pinot/query/runtime/operator/SortOperator.java | 12 ++++++++----
.../pinot/query/runtime/operator/SortOperatorTest.java | 3 ++-
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 1350242508..908bfb9406 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.utils.SortUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,13 +59,14 @@ public class SortOperator extends MultiStageOperator {
List<RexExpression> collationKeys, List<RelFieldCollation.Direction>
collationDirections, int fetch, int offset,
DataSchema dataSchema, boolean isInputSorted) {
this(context, upstreamOperator, collationKeys, collationDirections, fetch,
offset, dataSchema, isInputSorted,
- SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
+ SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
+ CommonConstants.Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
}
@VisibleForTesting
SortOperator(OpChainExecutionContext context, MultiStageOperator
upstreamOperator, List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int
offset, DataSchema dataSchema,
- boolean isInputSorted, int defaultHolderCapacity) {
+ boolean isInputSorted, int defaultHolderCapacity, int
defaultResponseLimit) {
super(context);
_upstreamOperator = upstreamOperator;
_fetch = fetch;
@@ -72,7 +74,9 @@ public class SortOperator extends MultiStageOperator {
_dataSchema = dataSchema;
_upstreamErrorBlock = null;
_isSortedBlockConstructed = false;
- _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
+ // Setting numRowsToKeep as default maximum on Broker if limit not set.
+ // TODO: make this default behavior configurable.
+ _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
// Under the following circumstances, the SortOperator is a simple
selection with row trim on limit & offset:
// - There are no collationKeys
// - 'isInputSorted' is set to true indicating that the data was already
sorted
@@ -82,7 +86,7 @@ public class SortOperator extends MultiStageOperator {
} else {
// Use the opposite direction as specified by the collation directions
since we need the PriorityQueue to decide
// which elements to keep and which to remove based on the limits.
- _priorityQueue = new PriorityQueue<>(_numRowsToKeep,
+ _priorityQueue = new PriorityQueue<>(Math.min(defaultHolderCapacity,
_numRowsToKeep),
new SortUtils.SortComparator(collationKeys, collationDirections,
dataSchema, false, true));
_rows = null;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index f2edadeae5..ebdb23aa3e 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -334,7 +334,8 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input,
collation, directions, 0, 0, schema, false, 1);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input,
collation, directions, 0, 0, schema, false, 10,
+ 1);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2},
new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]