This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 917493f Optimization for selection only queries: Allow early
termination (#5163)
917493f is described below
commit 917493fc96c93442571c47ee5fed36160691b924
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Mar 18 15:54:22 2020 -0700
Optimization for selection only queries: Allow early termination (#5163)
In Pinot CombineOpearator, queries are scheduled with at most 10 threads.
For tables with many segments(say 10k), it means each thread will process 1k
segments.
For selection only queries (e.g. select * from myTable limit 10), each
thread may collect enough results to return after scan a few segments. There is
no means to wait and scan all the segments.
This is extremely useful for people want to randomly browse a big table
from query console clicks.
---
.../core/operator/CombineGroupByOperator.java | 5 +-
.../operator/CombineGroupByOrderByOperator.java | 5 +-
.../pinot/core/operator/CombineOperator.java | 31 +++++-
.../pinot/core/operator/DocIdSetOperator.java | 8 +-
.../operator/query/AggregationGroupByOperator.java | 21 ++--
.../query/AggregationGroupByOrderByOperator.java | 21 ++--
.../core/operator/query/AggregationOperator.java | 21 ++--
.../query/DictionaryBasedAggregationOperator.java | 9 +-
.../query/MetadataBasedAggregationOperator.java | 10 +-
.../core/operator/query/SelectionOnlyOperator.java | 19 ++--
.../operator/query/SelectionOrderByOperator.java | 19 ++--
.../core/operator/CombineSlowOperatorsTest.java | 2 +-
.../pinot/queries/BaseSingleValueQueriesTest.java | 17 ++-
.../queries/SelectionOnlyEarlyTerminationTest.java | 114 +++++++++++++++++++++
14 files changed, 201 insertions(+), 101 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index 22e1af4..3708d0a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -213,10 +213,7 @@ public class CombineGroupByOperator extends
BaseOperator<IntermediateResultsBloc
// Set the execution statistics.
ExecutionStatistics executionStatistics = new ExecutionStatistics();
for (Operator operator : _operators) {
- ExecutionStatistics executionStatisticsToMerge =
operator.getExecutionStatistics();
- if (executionStatisticsToMerge != null) {
- executionStatistics.merge(executionStatisticsToMerge);
- }
+ executionStatistics.merge(operator.getExecutionStatistics());
}
mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
index d52571d..a06090d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
@@ -213,10 +213,7 @@ public class CombineGroupByOrderByOperator extends
BaseOperator<IntermediateResu
// Set the execution statistics.
ExecutionStatistics executionStatistics = new ExecutionStatistics();
for (Operator operator : _operators) {
- ExecutionStatistics executionStatisticsToMerge =
operator.getExecutionStatistics();
- if (executionStatisticsToMerge != null) {
- executionStatistics.merge(executionStatisticsToMerge);
- }
+ executionStatistics.merge(operator.getExecutionStatistics());
}
mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
index 14b4c49..8050916 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.operator;
+import java.io.Serializable;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Selection;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.reduce.CombineService;
@@ -48,7 +51,7 @@ public class CombineOperator extends
BaseOperator<IntermediateResultsBlock> {
// Use at most 10 or half of the processors threads for each query.
// If there are less than 2 processors, use 1 thread.
// Runtime.getRuntime().availableProcessors() may return value < 2 in
container based environment, e.g. Kubernetes.
- private static final int MAX_NUM_THREADS_PER_QUERY =
+ public static final int MAX_NUM_THREADS_PER_QUERY =
Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() /
2));
private final List<Operator> _operators;
@@ -101,6 +104,9 @@ public class CombineOperator extends
BaseOperator<IntermediateResultsBlock> {
IntermediateResultsBlock mergedBlock = (IntermediateResultsBlock)
_operators.get(index).nextBlock();
for (int i = index + numThreads; i < numOperators; i +=
numThreads) {
+ if (isQuerySatisfied(_brokerRequest, mergedBlock)) {
+ break;
+ }
IntermediateResultsBlock blockToMerge =
(IntermediateResultsBlock) _operators.get(i).nextBlock();
try {
CombineService.mergeTwoBlocks(_brokerRequest, mergedBlock,
blockToMerge);
@@ -134,6 +140,9 @@ public class CombineOperator extends
BaseOperator<IntermediateResultsBlock> {
}
int numMergedBlocks = 1;
while (numMergedBlocks < numThreads) {
+ if (isQuerySatisfied(_brokerRequest, mergedBlock)) {
+ break;
+ }
IntermediateResultsBlock blockToMerge =
blockingQueue.poll(endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
@@ -181,10 +190,7 @@ public class CombineOperator extends
BaseOperator<IntermediateResultsBlock> {
// Update execution statistics.
ExecutionStatistics executionStatistics = new ExecutionStatistics();
for (Operator operator : _operators) {
- ExecutionStatistics executionStatisticsToMerge =
operator.getExecutionStatistics();
- if (executionStatisticsToMerge != null) {
- executionStatistics.merge(executionStatisticsToMerge);
- }
+ executionStatistics.merge(operator.getExecutionStatistics());
}
mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
@@ -196,6 +202,21 @@ public class CombineOperator extends
BaseOperator<IntermediateResultsBlock> {
return mergedBlock;
}
+ /**
+ * Returns {@code true} if the query is already satisfied with the
IntermediateResultsBlock so that there is no need to
+ * process more segments, {@code false} otherwise.
+ * <p>For selection-only query, the query is satisfied when enough records
are gathered.
+ */
+ private boolean isQuerySatisfied(BrokerRequest brokerRequest,
IntermediateResultsBlock mergedBlock) {
+ Selection selections = brokerRequest.getSelections();
+ if (selections != null && brokerRequest.getOrderBy() == null) {
+ // Selection-only
+ Collection<Serializable[]> selectionResult =
mergedBlock.getSelectionResult();
+ return selectionResult != null && selectionResult.size() >=
selections.getSize();
+ }
+ return false;
+ }
+
@Override
public String getOperatorName() {
return OPERATOR_NAME;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index 5db7db4..9eb01a7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -70,12 +70,12 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
// Initialize filter block document Id set
if (_filterBlockDocIdSet == null) {
- _filterBlockDocIdSet = (FilterBlockDocIdSet)
_filterOperator.nextBlock().getBlockDocIdSet();
+ _filterBlockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
_blockDocIdIterator = _filterBlockDocIdSet.iterator();
}
int pos = 0;
- int[] docIds = _threadLocal? THREAD_LOCAL_DOC_IDS.get(): new
int[_maxSizeOfDocIdSet];
+ int[] docIds = _threadLocal ? THREAD_LOCAL_DOC_IDS.get() : new
int[_maxSizeOfDocIdSet];
for (int i = 0; i < _maxSizeOfDocIdSet; i++) {
_currentDocId = _blockDocIdIterator.next();
if (_currentDocId == Constants.EOF) {
@@ -97,6 +97,8 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
@Override
public ExecutionStatistics getExecutionStatistics() {
- return new ExecutionStatistics(0L,
_filterBlockDocIdSet.getNumEntriesScannedInFilter(), 0L, 0L);
+ long numEntriesScannedInFilter =
+ _filterBlockDocIdSet != null ?
_filterBlockDocIdSet.getNumEntriesScannedInFilter() : 0;
+ return new ExecutionStatistics(0, numEntriesScannedInFilter, 0, 0);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
index a51db1b..58ae7b0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
@@ -25,7 +25,6 @@ import
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
@@ -45,7 +44,7 @@ public class AggregationGroupByOperator extends
BaseOperator<IntermediateResults
private final long _numTotalDocs;
private final boolean _useStarTree;
- private ExecutionStatistics _executionStatistics;
+ private int _numDocsScanned = 0;
public AggregationGroupByOperator(AggregationFunctionContext[]
functionContexts, GroupBy groupBy,
int maxInitialResultHolderCapacity, int numGroupsLimit,
TransformOperator transformOperator, long numTotalDocs,
@@ -61,8 +60,6 @@ public class AggregationGroupByOperator extends
BaseOperator<IntermediateResults
@Override
protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
// Perform aggregation group-by on all the blocks
GroupByExecutor groupByExecutor;
if (_useStarTree) {
@@ -76,19 +73,12 @@ public class AggregationGroupByOperator extends
BaseOperator<IntermediateResults
}
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
- numDocsScanned += transformBlock.getNumDocs();
+ _numDocsScanned += transformBlock.getNumDocs();
groupByExecutor.process(transformBlock);
}
- AggregationGroupByResult groupByResult = groupByExecutor.getResult();
-
- // Gather execution statistics
- long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned *
_transformOperator.getNumColumnsProjected();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter, _numTotalDocs);
// Build intermediate result block based on aggregation group-by result
from the executor
- return new IntermediateResultsBlock(_functionContexts, groupByResult);
+ return new IntermediateResultsBlock(_functionContexts,
groupByExecutor.getResult());
}
@Override
@@ -98,6 +88,9 @@ public class AggregationGroupByOperator extends
BaseOperator<IntermediateResults
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = (long) _numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ _numTotalDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index 2c11b01..9567940 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -27,7 +27,6 @@ import
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
@@ -49,7 +48,7 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
private final long _numTotalDocs;
private final boolean _useStarTree;
- private ExecutionStatistics _executionStatistics;
+ private int _numDocsScanned;
public AggregationGroupByOrderByOperator(AggregationFunctionContext[]
functionContexts, GroupBy groupBy,
int maxInitialResultHolderCapacity, int numGroupsLimit,
TransformOperator transformOperator, long numTotalDocs,
@@ -92,8 +91,6 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
@Override
protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
// Perform aggregation group-by on all the blocks
GroupByExecutor groupByExecutor;
if (_useStarTree) {
@@ -107,19 +104,12 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
}
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
- numDocsScanned += transformBlock.getNumDocs();
+ _numDocsScanned += transformBlock.getNumDocs();
groupByExecutor.process(transformBlock);
}
- AggregationGroupByResult groupByResult = groupByExecutor.getResult();
-
- // Gather execution statistics
- long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned *
_transformOperator.getNumColumnsProjected();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter, _numTotalDocs);
// Build intermediate result block based on aggregation group-by result
from the executor
- return new IntermediateResultsBlock(_functionContexts, groupByResult,
_dataSchema);
+ return new IntermediateResultsBlock(_functionContexts,
groupByExecutor.getResult(), _dataSchema);
}
@Override
@@ -129,6 +119,9 @@ public class AggregationGroupByOrderByOperator extends
BaseOperator<Intermediate
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = (long) _numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ _numTotalDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
index e5b99df..a20d39d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.operator.query;
-import java.util.List;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
@@ -41,7 +40,7 @@ public class AggregationOperator extends
BaseOperator<IntermediateResultsBlock>
private final long _numTotalDocs;
private final boolean _useStarTree;
- private ExecutionStatistics _executionStatistics;
+ private int _numDocsScanned = 0;
public AggregationOperator(AggregationFunctionContext[] functionContexts,
TransformOperator transformOperator,
long numTotalDocs, boolean useStarTree) {
@@ -53,8 +52,6 @@ public class AggregationOperator extends
BaseOperator<IntermediateResultsBlock>
@Override
protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
// Perform aggregation on all the transform blocks
AggregationExecutor aggregationExecutor;
if (_useStarTree) {
@@ -64,19 +61,12 @@ public class AggregationOperator extends
BaseOperator<IntermediateResultsBlock>
}
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
- numDocsScanned += transformBlock.getNumDocs();
+ _numDocsScanned += transformBlock.getNumDocs();
aggregationExecutor.aggregate(transformBlock);
}
- List<Object> aggregationResult = aggregationExecutor.getResult();
-
- // Create execution statistics
- long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned *
_transformOperator.getNumColumnsProjected();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter, _numTotalDocs);
// Build intermediate result block based on aggregation result from the
executor
- return new IntermediateResultsBlock(_functionContexts, aggregationResult,
false);
+ return new IntermediateResultsBlock(_functionContexts,
aggregationExecutor.getResult(), false);
}
@Override
@@ -86,6 +76,9 @@ public class AggregationOperator extends
BaseOperator<IntermediateResultsBlock>
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = (long) _numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ _numTotalDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
index 3270d4f..8750684 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
@@ -50,7 +50,6 @@ public class DictionaryBasedAggregationOperator extends
BaseOperator<Intermediat
private final AggregationFunctionContext[] _aggregationFunctionContexts;
private final Map<String, Dictionary> _dictionaryMap;
private final long _numTotalDocs;
- private ExecutionStatistics _executionStatistics;
/**
* Constructor for the class.
@@ -96,11 +95,6 @@ public class DictionaryBasedAggregationOperator extends
BaseOperator<Intermediat
aggregationResults.add(function.extractAggregationResult(resultHolder));
}
- // Create execution statistics. Set numDocsScanned to numTotalDocs for
backward compatibility.
- _executionStatistics =
- new ExecutionStatistics(_numTotalDocs, 0/* numEntriesScannedInFilter
*/, 0/* numEntriesScannedPostFilter */,
- _numTotalDocs);
-
// Build intermediate result block based on aggregation result from the
executor.
return new IntermediateResultsBlock(_aggregationFunctionContexts,
aggregationResults, false);
}
@@ -112,6 +106,7 @@ public class DictionaryBasedAggregationOperator extends
BaseOperator<Intermediat
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ // NOTE: Set numDocsScanned to numTotalDocs for backward compatibility.
+ return new ExecutionStatistics(_numTotalDocs, 0, 0, _numTotalDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
index 98ea75e..f999e62 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
@@ -40,7 +40,6 @@ public class MetadataBasedAggregationOperator extends
BaseOperator<IntermediateR
private final AggregationFunctionContext[] _aggregationFunctionContexts;
private final Map<String, DataSource> _dataSourceMap;
private final SegmentMetadata _segmentMetadata;
- private ExecutionStatistics _executionStatistics;
/**
* Constructor for the class.
@@ -71,11 +70,6 @@ public class MetadataBasedAggregationOperator extends
BaseOperator<IntermediateR
aggregationResults.add(numTotalDocs);
}
- // Create execution statistics. Set numDocsScanned to numTotalDocs for
backward compatibility.
- _executionStatistics =
- new ExecutionStatistics(numTotalDocs, 0/*numEntriesScannedInFilter*/,
0/*numEntriesScannedPostFilter*/,
- numTotalDocs);
-
// Build intermediate result block based on aggregation result from the
executor.
return new IntermediateResultsBlock(_aggregationFunctionContexts,
aggregationResults, false);
}
@@ -87,6 +81,8 @@ public class MetadataBasedAggregationOperator extends
BaseOperator<IntermediateR
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ // NOTE: Set numDocsScanned to numTotalDocs for backward compatibility.
+ int numTotalDocs = _segmentMetadata.getTotalDocs();
+ return new ExecutionStatistics(numTotalDocs, 0, 0, numTotalDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index e0d1687..f86d16c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -47,7 +47,7 @@ public class SelectionOnlyOperator extends
BaseOperator<IntermediateResultsBlock
private final int _numRowsToKeep;
private final List<Serializable[]> _rows;
- private ExecutionStatistics _executionStatistics;
+ private int _numDocsScanned = 0;
public SelectionOnlyOperator(IndexSegment indexSegment, Selection selection,
TransformOperator transformOperator) {
_indexSegment = indexSegment;
@@ -73,8 +73,6 @@ public class SelectionOnlyOperator extends
BaseOperator<IntermediateResultsBlock
@Override
protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
int numExpressions = _expressions.size();
@@ -84,7 +82,7 @@ public class SelectionOnlyOperator extends
BaseOperator<IntermediateResultsBlock
RowBasedBlockValueFetcher blockValueFetcher = new
RowBasedBlockValueFetcher(_blockValSets);
int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(),
transformBlock.getNumDocs());
- numDocsScanned += numDocsToAdd;
+ _numDocsScanned += numDocsToAdd;
for (int i = 0; i < numDocsToAdd; i++) {
_rows.add(blockValueFetcher.getRow(i));
}
@@ -93,13 +91,6 @@ public class SelectionOnlyOperator extends
BaseOperator<IntermediateResultsBlock
}
}
- // Create execution statistics.
- long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned *
_transformOperator.getNumColumnsProjected();
- long numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter, numTotalDocs);
-
return new IntermediateResultsBlock(_dataSchema, _rows);
}
@@ -110,6 +101,10 @@ public class SelectionOnlyOperator extends
BaseOperator<IntermediateResultsBlock
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = (long) _numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ numTotalDocs);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index b4f2228..86b84d9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -52,7 +52,7 @@ public class SelectionOrderByOperator extends
BaseOperator<IntermediateResultsBl
private final int _numRowsToKeep;
private final PriorityQueue<Serializable[]> _rows;
- private ExecutionStatistics _executionStatistics;
+ private int _numDocsScanned = 0;
public SelectionOrderByOperator(IndexSegment indexSegment, Selection
selection, TransformOperator transformOperator) {
_indexSegment = indexSegment;
@@ -139,8 +139,6 @@ public class SelectionOrderByOperator extends
BaseOperator<IntermediateResultsBl
@Override
protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
int numExpressions = _expressions.size();
@@ -152,19 +150,12 @@ public class SelectionOrderByOperator extends
BaseOperator<IntermediateResultsBl
RowBasedBlockValueFetcher blockValueFetcher = new
RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
- numDocsScanned += numDocsFetched;
+ _numDocsScanned += numDocsFetched;
for (int i = 0; i < numDocsFetched; i++) {
SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i),
_rows, _numRowsToKeep);
}
}
- // Create execution statistics.
- long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned *
_transformOperator.getNumColumnsProjected();
- long numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter, numTotalDocs);
-
return new IntermediateResultsBlock(_dataSchema, _rows);
}
@@ -175,6 +166,10 @@ public class SelectionOrderByOperator extends
BaseOperator<IntermediateResultsBl
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = (long) _numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ numTotalDocs);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
index 1d9089f..9c4acef 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
@@ -144,7 +144,7 @@ public class CombineSlowOperatorsTest {
@Override
public ExecutionStatistics getExecutionStatistics() {
- return null;
+ return new ExecutionStatistics();
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index d54fea0..f8e17e6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -20,12 +20,11 @@ package org.apache.pinot.queries;
import java.io.File;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -35,6 +34,8 @@ import
org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
@@ -65,6 +66,7 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
private static final String AVRO_DATA = "data" + File.separator +
"test_data-sv.avro";
private static final String SEGMENT_NAME = "testTable_126164076_167572854";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"SingleValueQueriesTest");
+ private static final int NUM_SEGMENT_DATA_MANAGERS = 2;
// Hard-coded query filter.
private static final String QUERY_FILTER =
@@ -121,8 +123,15 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
throws Exception {
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
_indexSegment = immutableSegment;
- _segmentDataManagers = Arrays
- .asList(new ImmutableSegmentDataManager(immutableSegment), new
ImmutableSegmentDataManager(immutableSegment));
+ int numSegmentDataManagers = getNumSegmentDataManagers();
+ _segmentDataManagers = new ArrayList<>(numSegmentDataManagers);
+ for (int i = 0; i < numSegmentDataManagers; i++) {
+ _segmentDataManagers.add(new
ImmutableSegmentDataManager(immutableSegment));
+ }
+ }
+
+ protected int getNumSegmentDataManagers() {
+ return NUM_SEGMENT_DATA_MANAGERS;
}
@AfterClass
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
new file mode 100644
index 0000000..8c39e45
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.operator.CombineOperator;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Early termination test for selection-only queries.
+ */
+public class SelectionOnlyEarlyTerminationTest extends
BaseSingleValueQueriesTest {
+ private static final int NUM_DOCS_PER_SEGMENT = 30000;
+ private static final int NUM_SERVERS = 2;
+
+ /**
+ * In order to ensure each thread is executing more than 1 segment, this
test is against
+ * (2 * MAX_NUM_THREADS_PER_QUERY) segments per server.
+ */
+ @Override
+ protected int getNumSegmentDataManagers() {
+ return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2;
+ }
+
+ /**
+ * With early termination, selection-only query is scheduled with {@link
CombineOperator#MAX_NUM_THREADS_PER_QUERY}
+ * threads per server, and the total number of segments matched (segments
with non-zero documents scanned) should be
+ * the same as the total number of threads for each server.
+ */
+ @Test
+ public void testSelectOnlyQuery() {
+ int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY;
+ int numSegmentsPerServer = getNumSegmentDataManagers();
+
+ // LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480
+ for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) {
+ String query = String.format("SELECT column1, column7, column9, column6
FROM testTable LIMIT %d", limit);
+ int numColumnsInSelection = 4;
+ BrokerResponseNative brokerResponse =
getBrokerResponseForPqlQuery(query);
+ assertNotNull(brokerResponse.getSelectionResults());
+ assertNull(brokerResponse.getResultTable());
+ assertEquals(brokerResponse.getNumSegmentsProcessed(),
numSegmentsPerServer * NUM_SERVERS);
+ assertEquals(brokerResponse.getNumSegmentsMatched(), numThreadsPerServer
* NUM_SERVERS);
+ assertEquals(brokerResponse.getNumDocsScanned(), numThreadsPerServer *
NUM_SERVERS * limit);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+ numThreadsPerServer * NUM_SERVERS * limit * numColumnsInSelection);
+ assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer *
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+
+ brokerResponse = getBrokerResponseForSqlQuery(query);
+ assertNull(brokerResponse.getSelectionResults());
+ assertNotNull(brokerResponse.getResultTable());
+ assertEquals(brokerResponse.getNumSegmentsProcessed(),
numSegmentsPerServer * NUM_SERVERS);
+ assertEquals(brokerResponse.getNumSegmentsMatched(), numThreadsPerServer
* NUM_SERVERS);
+ assertEquals(brokerResponse.getNumDocsScanned(), numThreadsPerServer *
NUM_SERVERS * limit);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+ numThreadsPerServer * NUM_SERVERS * limit * numColumnsInSelection);
+ assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer *
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+ }
+ }
+
+ /**
+ * Without early termination, selection order-by query should hit all
segments.
+ */
+ @Test
+ public void testSelectWithOrderByQuery() {
+ int numSegmentsPerServer = getNumSegmentDataManagers();
+ String query = "SELECT column11, column18, column1 FROM testTable ORDER BY
column11";
+ int numColumnsInSelection = 3;
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ assertNotNull(brokerResponse.getSelectionResults());
+ assertNull(brokerResponse.getResultTable());
+ assertEquals(brokerResponse.getNumSegmentsProcessed(),
numSegmentsPerServer * NUM_SERVERS);
+ assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer
* NUM_SERVERS);
+ assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer *
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+ numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT *
numColumnsInSelection);
+ assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer *
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+
+ brokerResponse = getBrokerResponseForSqlQuery(query);
+ assertNull(brokerResponse.getSelectionResults());
+ assertNotNull(brokerResponse.getResultTable());
+ assertEquals(brokerResponse.getNumSegmentsProcessed(),
numSegmentsPerServer * NUM_SERVERS);
+ assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer
* NUM_SERVERS);
+ assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer *
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+ numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT *
numColumnsInSelection);
+ assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer *
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]