This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 917493f  Optimization for selection only queries: Allow early 
termination (#5163)
917493f is described below

commit 917493fc96c93442571c47ee5fed36160691b924
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Mar 18 15:54:22 2020 -0700

    Optimization for selection only queries: Allow early termination (#5163)
    
    In Pinot CombineOpearator, queries are scheduled with at most 10 threads. 
For tables with many segments(say 10k), it means each thread will process 1k 
segments.
    For selection only queries (e.g. select * from myTable limit 10), each 
thread may collect enough results to return after scan a few segments. There is 
no means to wait and scan all the segments.
    This is extremely useful for people want to randomly browse a big table 
from query console clicks.
---
 .../core/operator/CombineGroupByOperator.java      |   5 +-
 .../operator/CombineGroupByOrderByOperator.java    |   5 +-
 .../pinot/core/operator/CombineOperator.java       |  31 +++++-
 .../pinot/core/operator/DocIdSetOperator.java      |   8 +-
 .../operator/query/AggregationGroupByOperator.java |  21 ++--
 .../query/AggregationGroupByOrderByOperator.java   |  21 ++--
 .../core/operator/query/AggregationOperator.java   |  21 ++--
 .../query/DictionaryBasedAggregationOperator.java  |   9 +-
 .../query/MetadataBasedAggregationOperator.java    |  10 +-
 .../core/operator/query/SelectionOnlyOperator.java |  19 ++--
 .../operator/query/SelectionOrderByOperator.java   |  19 ++--
 .../core/operator/CombineSlowOperatorsTest.java    |   2 +-
 .../pinot/queries/BaseSingleValueQueriesTest.java  |  17 ++-
 .../queries/SelectionOnlyEarlyTerminationTest.java | 114 +++++++++++++++++++++
 14 files changed, 201 insertions(+), 101 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index 22e1af4..3708d0a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -213,10 +213,7 @@ public class CombineGroupByOperator extends 
BaseOperator<IntermediateResultsBloc
       // Set the execution statistics.
       ExecutionStatistics executionStatistics = new ExecutionStatistics();
       for (Operator operator : _operators) {
-        ExecutionStatistics executionStatisticsToMerge = 
operator.getExecutionStatistics();
-        if (executionStatisticsToMerge != null) {
-          executionStatistics.merge(executionStatisticsToMerge);
-        }
+        executionStatistics.merge(operator.getExecutionStatistics());
       }
       mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
       
mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
index d52571d..a06090d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
@@ -213,10 +213,7 @@ public class CombineGroupByOrderByOperator extends 
BaseOperator<IntermediateResu
       // Set the execution statistics.
       ExecutionStatistics executionStatistics = new ExecutionStatistics();
       for (Operator operator : _operators) {
-        ExecutionStatistics executionStatisticsToMerge = 
operator.getExecutionStatistics();
-        if (executionStatisticsToMerge != null) {
-          executionStatistics.merge(executionStatisticsToMerge);
-        }
+        executionStatistics.merge(operator.getExecutionStatistics());
       }
       mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
       
mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
index 14b4c49..8050916 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.operator;
 
+import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.reduce.CombineService;
@@ -48,7 +51,7 @@ public class CombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
   // Use at most 10 or half of the processors threads for each query.
   // If there are less than 2 processors, use 1 thread.
   // Runtime.getRuntime().availableProcessors() may return value < 2 in 
container based environment, e.g. Kubernetes.
-  private static final int MAX_NUM_THREADS_PER_QUERY =
+  public static final int MAX_NUM_THREADS_PER_QUERY =
       Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 
2));
 
   private final List<Operator> _operators;
@@ -101,6 +104,9 @@ public class CombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
 
             IntermediateResultsBlock mergedBlock = (IntermediateResultsBlock) 
_operators.get(index).nextBlock();
             for (int i = index + numThreads; i < numOperators; i += 
numThreads) {
+              if (isQuerySatisfied(_brokerRequest, mergedBlock)) {
+                break;
+              }
               IntermediateResultsBlock blockToMerge = 
(IntermediateResultsBlock) _operators.get(i).nextBlock();
               try {
                 CombineService.mergeTwoBlocks(_brokerRequest, mergedBlock, 
blockToMerge);
@@ -134,6 +140,9 @@ public class CombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
             }
             int numMergedBlocks = 1;
             while (numMergedBlocks < numThreads) {
+              if (isQuerySatisfied(_brokerRequest, mergedBlock)) {
+                break;
+              }
               IntermediateResultsBlock blockToMerge =
                   blockingQueue.poll(endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
               if (blockToMerge == null) {
@@ -181,10 +190,7 @@ public class CombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
     // Update execution statistics.
     ExecutionStatistics executionStatistics = new ExecutionStatistics();
     for (Operator operator : _operators) {
-      ExecutionStatistics executionStatisticsToMerge = 
operator.getExecutionStatistics();
-      if (executionStatisticsToMerge != null) {
-        executionStatistics.merge(executionStatisticsToMerge);
-      }
+      executionStatistics.merge(operator.getExecutionStatistics());
     }
     mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
     
mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
@@ -196,6 +202,21 @@ public class CombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
     return mergedBlock;
   }
 
+  /**
+   * Returns {@code true} if the query is already satisfied with the 
IntermediateResultsBlock so that there is no need to
+   * process more segments, {@code false} otherwise.
+   * <p>For selection-only query, the query is satisfied when enough records 
are gathered.
+   */
+  private boolean isQuerySatisfied(BrokerRequest brokerRequest, 
IntermediateResultsBlock mergedBlock) {
+    Selection selections = brokerRequest.getSelections();
+    if (selections != null && brokerRequest.getOrderBy() == null) {
+      // Selection-only
+      Collection<Serializable[]> selectionResult = 
mergedBlock.getSelectionResult();
+      return selectionResult != null && selectionResult.size() >= 
selections.getSize();
+    }
+    return false;
+  }
+
   @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index 5db7db4..9eb01a7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -70,12 +70,12 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
 
     // Initialize filter block document Id set
     if (_filterBlockDocIdSet == null) {
-      _filterBlockDocIdSet = (FilterBlockDocIdSet) 
_filterOperator.nextBlock().getBlockDocIdSet();
+      _filterBlockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
       _blockDocIdIterator = _filterBlockDocIdSet.iterator();
     }
 
     int pos = 0;
-    int[] docIds = _threadLocal? THREAD_LOCAL_DOC_IDS.get(): new 
int[_maxSizeOfDocIdSet];
+    int[] docIds = _threadLocal ? THREAD_LOCAL_DOC_IDS.get() : new 
int[_maxSizeOfDocIdSet];
     for (int i = 0; i < _maxSizeOfDocIdSet; i++) {
       _currentDocId = _blockDocIdIterator.next();
       if (_currentDocId == Constants.EOF) {
@@ -97,6 +97,8 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return new ExecutionStatistics(0L, 
_filterBlockDocIdSet.getNumEntriesScannedInFilter(), 0L, 0L);
+    long numEntriesScannedInFilter =
+        _filterBlockDocIdSet != null ? 
_filterBlockDocIdSet.getNumEntriesScannedInFilter() : 0;
+    return new ExecutionStatistics(0, numEntriesScannedInFilter, 0, 0);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
index a51db1b..58ae7b0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
@@ -25,7 +25,6 @@ import 
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
@@ -45,7 +44,7 @@ public class AggregationGroupByOperator extends 
BaseOperator<IntermediateResults
   private final long _numTotalDocs;
   private final boolean _useStarTree;
 
-  private ExecutionStatistics _executionStatistics;
+  private int _numDocsScanned = 0;
 
   public AggregationGroupByOperator(AggregationFunctionContext[] 
functionContexts, GroupBy groupBy,
       int maxInitialResultHolderCapacity, int numGroupsLimit, 
TransformOperator transformOperator, long numTotalDocs,
@@ -61,8 +60,6 @@ public class AggregationGroupByOperator extends 
BaseOperator<IntermediateResults
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
     // Perform aggregation group-by on all the blocks
     GroupByExecutor groupByExecutor;
     if (_useStarTree) {
@@ -76,19 +73,12 @@ public class AggregationGroupByOperator extends 
BaseOperator<IntermediateResults
     }
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
-      numDocsScanned += transformBlock.getNumDocs();
+      _numDocsScanned += transformBlock.getNumDocs();
       groupByExecutor.process(transformBlock);
     }
-    AggregationGroupByResult groupByResult = groupByExecutor.getResult();
-
-    // Gather execution statistics
-    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * 
_transformOperator.getNumColumnsProjected();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, _numTotalDocs);
 
     // Build intermediate result block based on aggregation group-by result 
from the executor
-    return new IntermediateResultsBlock(_functionContexts, groupByResult);
+    return new IntermediateResultsBlock(_functionContexts, 
groupByExecutor.getResult());
   }
 
   @Override
@@ -98,6 +88,9 @@ public class AggregationGroupByOperator extends 
BaseOperator<IntermediateResults
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        _numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index 2c11b01..9567940 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -27,7 +27,6 @@ import 
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
@@ -49,7 +48,7 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
   private final long _numTotalDocs;
   private final boolean _useStarTree;
 
-  private ExecutionStatistics _executionStatistics;
+  private int _numDocsScanned;
 
   public AggregationGroupByOrderByOperator(AggregationFunctionContext[] 
functionContexts, GroupBy groupBy,
       int maxInitialResultHolderCapacity, int numGroupsLimit, 
TransformOperator transformOperator, long numTotalDocs,
@@ -92,8 +91,6 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
     // Perform aggregation group-by on all the blocks
     GroupByExecutor groupByExecutor;
     if (_useStarTree) {
@@ -107,19 +104,12 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
     }
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
-      numDocsScanned += transformBlock.getNumDocs();
+      _numDocsScanned += transformBlock.getNumDocs();
       groupByExecutor.process(transformBlock);
     }
-    AggregationGroupByResult groupByResult = groupByExecutor.getResult();
-
-    // Gather execution statistics
-    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * 
_transformOperator.getNumColumnsProjected();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, _numTotalDocs);
 
     // Build intermediate result block based on aggregation group-by result 
from the executor
-    return new IntermediateResultsBlock(_functionContexts, groupByResult, 
_dataSchema);
+    return new IntermediateResultsBlock(_functionContexts, 
groupByExecutor.getResult(), _dataSchema);
   }
 
   @Override
@@ -129,6 +119,9 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        _numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
index e5b99df..a20d39d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.operator.query;
 
-import java.util.List;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
@@ -41,7 +40,7 @@ public class AggregationOperator extends 
BaseOperator<IntermediateResultsBlock>
   private final long _numTotalDocs;
   private final boolean _useStarTree;
 
-  private ExecutionStatistics _executionStatistics;
+  private int _numDocsScanned = 0;
 
   public AggregationOperator(AggregationFunctionContext[] functionContexts, 
TransformOperator transformOperator,
       long numTotalDocs, boolean useStarTree) {
@@ -53,8 +52,6 @@ public class AggregationOperator extends 
BaseOperator<IntermediateResultsBlock>
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
     // Perform aggregation on all the transform blocks
     AggregationExecutor aggregationExecutor;
     if (_useStarTree) {
@@ -64,19 +61,12 @@ public class AggregationOperator extends 
BaseOperator<IntermediateResultsBlock>
     }
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
-      numDocsScanned += transformBlock.getNumDocs();
+      _numDocsScanned += transformBlock.getNumDocs();
       aggregationExecutor.aggregate(transformBlock);
     }
-    List<Object> aggregationResult = aggregationExecutor.getResult();
-
-    // Create execution statistics
-    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * 
_transformOperator.getNumColumnsProjected();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, _numTotalDocs);
 
     // Build intermediate result block based on aggregation result from the 
executor
-    return new IntermediateResultsBlock(_functionContexts, aggregationResult, 
false);
+    return new IntermediateResultsBlock(_functionContexts, 
aggregationExecutor.getResult(), false);
   }
 
   @Override
@@ -86,6 +76,9 @@ public class AggregationOperator extends 
BaseOperator<IntermediateResultsBlock>
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        _numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
index 3270d4f..8750684 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
@@ -50,7 +50,6 @@ public class DictionaryBasedAggregationOperator extends 
BaseOperator<Intermediat
   private final AggregationFunctionContext[] _aggregationFunctionContexts;
   private final Map<String, Dictionary> _dictionaryMap;
   private final long _numTotalDocs;
-  private ExecutionStatistics _executionStatistics;
 
   /**
    * Constructor for the class.
@@ -96,11 +95,6 @@ public class DictionaryBasedAggregationOperator extends 
BaseOperator<Intermediat
       aggregationResults.add(function.extractAggregationResult(resultHolder));
     }
 
-    // Create execution statistics. Set numDocsScanned to numTotalDocs for 
backward compatibility.
-    _executionStatistics =
-        new ExecutionStatistics(_numTotalDocs, 0/* numEntriesScannedInFilter 
*/, 0/* numEntriesScannedPostFilter */,
-            _numTotalDocs);
-
     // Build intermediate result block based on aggregation result from the 
executor.
     return new IntermediateResultsBlock(_aggregationFunctionContexts, 
aggregationResults, false);
   }
@@ -112,6 +106,7 @@ public class DictionaryBasedAggregationOperator extends 
BaseOperator<Intermediat
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    // NOTE: Set numDocsScanned to numTotalDocs for backward compatibility.
+    return new ExecutionStatistics(_numTotalDocs, 0, 0, _numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
index 98ea75e..f999e62 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
@@ -40,7 +40,6 @@ public class MetadataBasedAggregationOperator extends 
BaseOperator<IntermediateR
   private final AggregationFunctionContext[] _aggregationFunctionContexts;
   private final Map<String, DataSource> _dataSourceMap;
   private final SegmentMetadata _segmentMetadata;
-  private ExecutionStatistics _executionStatistics;
 
   /**
    * Constructor for the class.
@@ -71,11 +70,6 @@ public class MetadataBasedAggregationOperator extends 
BaseOperator<IntermediateR
       aggregationResults.add(numTotalDocs);
     }
 
-    // Create execution statistics. Set numDocsScanned to numTotalDocs for 
backward compatibility.
-    _executionStatistics =
-        new ExecutionStatistics(numTotalDocs, 0/*numEntriesScannedInFilter*/, 
0/*numEntriesScannedPostFilter*/,
-            numTotalDocs);
-
     // Build intermediate result block based on aggregation result from the 
executor.
     return new IntermediateResultsBlock(_aggregationFunctionContexts, 
aggregationResults, false);
   }
@@ -87,6 +81,8 @@ public class MetadataBasedAggregationOperator extends 
BaseOperator<IntermediateR
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    // NOTE: Set numDocsScanned to numTotalDocs for backward compatibility.
+    int numTotalDocs = _segmentMetadata.getTotalDocs();
+    return new ExecutionStatistics(numTotalDocs, 0, 0, numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index e0d1687..f86d16c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -47,7 +47,7 @@ public class SelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock
   private final int _numRowsToKeep;
   private final List<Serializable[]> _rows;
 
-  private ExecutionStatistics _executionStatistics;
+  private int _numDocsScanned = 0;
 
   public SelectionOnlyOperator(IndexSegment indexSegment, Selection selection, 
TransformOperator transformOperator) {
     _indexSegment = indexSegment;
@@ -73,8 +73,6 @@ public class SelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
       int numExpressions = _expressions.size();
@@ -84,7 +82,7 @@ public class SelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock
       RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(_blockValSets);
 
       int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), 
transformBlock.getNumDocs());
-      numDocsScanned += numDocsToAdd;
+      _numDocsScanned += numDocsToAdd;
       for (int i = 0; i < numDocsToAdd; i++) {
         _rows.add(blockValueFetcher.getRow(i));
       }
@@ -93,13 +91,6 @@ public class SelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock
       }
     }
 
-    // Create execution statistics.
-    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * 
_transformOperator.getNumColumnsProjected();
-    long numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, numTotalDocs);
-
     return new IntermediateResultsBlock(_dataSchema, _rows);
   }
 
@@ -110,6 +101,10 @@ public class SelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index b4f2228..86b84d9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -52,7 +52,7 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
   private final int _numRowsToKeep;
   private final PriorityQueue<Serializable[]> _rows;
 
-  private ExecutionStatistics _executionStatistics;
+  private int _numDocsScanned = 0;
 
   public SelectionOrderByOperator(IndexSegment indexSegment, Selection 
selection, TransformOperator transformOperator) {
     _indexSegment = indexSegment;
@@ -139,8 +139,6 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
       int numExpressions = _expressions.size();
@@ -152,19 +150,12 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
       RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
 
       int numDocsFetched = transformBlock.getNumDocs();
-      numDocsScanned += numDocsFetched;
+      _numDocsScanned += numDocsFetched;
       for (int i = 0; i < numDocsFetched; i++) {
         SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), 
_rows, _numRowsToKeep);
       }
     }
 
-    // Create execution statistics.
-    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * 
_transformOperator.getNumColumnsProjected();
-    long numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, numTotalDocs);
-
     return new IntermediateResultsBlock(_dataSchema, _rows);
   }
 
@@ -175,6 +166,10 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        numTotalDocs);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
index 1d9089f..9c4acef 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
@@ -144,7 +144,7 @@ public class CombineSlowOperatorsTest {
 
     @Override
     public ExecutionStatistics getExecutionStatistics() {
-      return null;
+      return new ExecutionStatistics();
     }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index d54fea0..f8e17e6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -20,12 +20,11 @@ package org.apache.pinot.queries;
 
 import java.io.File;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -35,6 +34,8 @@ import 
org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterTest;
@@ -65,6 +66,7 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-sv.avro";
   private static final String SEGMENT_NAME = "testTable_126164076_167572854";
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"SingleValueQueriesTest");
+  private static final int NUM_SEGMENT_DATA_MANAGERS = 2;
 
   // Hard-coded query filter.
   private static final String QUERY_FILTER =
@@ -121,8 +123,15 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
       throws Exception {
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     _indexSegment = immutableSegment;
-    _segmentDataManagers = Arrays
-        .asList(new ImmutableSegmentDataManager(immutableSegment), new 
ImmutableSegmentDataManager(immutableSegment));
+    int numSegmentDataManagers = getNumSegmentDataManagers();
+    _segmentDataManagers = new ArrayList<>(numSegmentDataManagers);
+    for (int i = 0; i < numSegmentDataManagers; i++) {
+      _segmentDataManagers.add(new 
ImmutableSegmentDataManager(immutableSegment));
+    }
+  }
+
+  protected int getNumSegmentDataManagers() {
+    return NUM_SEGMENT_DATA_MANAGERS;
   }
 
   @AfterClass
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
new file mode 100644
index 0000000..8c39e45
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.operator.CombineOperator;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Early termination test for selection-only queries.
+ */
+public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTest {
+  private static final int NUM_DOCS_PER_SEGMENT = 30000;
+  private static final int NUM_SERVERS = 2;
+
+  /**
+   * In order to ensure each thread is executing more than 1 segment, this 
test is against
+   * (2 * MAX_NUM_THREADS_PER_QUERY) segments per server.
+   */
+  @Override
+  protected int getNumSegmentDataManagers() {
+    return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2;
+  }
+
+  /**
+   * With early termination, selection-only query is scheduled with {@link 
CombineOperator#MAX_NUM_THREADS_PER_QUERY}
+   * threads per server, and the total number of segments matched (segments 
with non-zero documents scanned) should be
+   * the same as the total number of threads for each server.
+   */
+  @Test
+  public void testSelectOnlyQuery() {
+    int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY;
+    int numSegmentsPerServer = getNumSegmentDataManagers();
+
+    // LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480
+    for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) {
+      String query = String.format("SELECT column1, column7, column9, column6 
FROM testTable LIMIT %d", limit);
+      int numColumnsInSelection = 4;
+      BrokerResponseNative brokerResponse = 
getBrokerResponseForPqlQuery(query);
+      assertNotNull(brokerResponse.getSelectionResults());
+      assertNull(brokerResponse.getResultTable());
+      assertEquals(brokerResponse.getNumSegmentsProcessed(), 
numSegmentsPerServer * NUM_SERVERS);
+      assertEquals(brokerResponse.getNumSegmentsMatched(), numThreadsPerServer 
* NUM_SERVERS);
+      assertEquals(brokerResponse.getNumDocsScanned(), numThreadsPerServer * 
NUM_SERVERS * limit);
+      assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+      assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+          numThreadsPerServer * NUM_SERVERS * limit * numColumnsInSelection);
+      assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+
+      brokerResponse = getBrokerResponseForSqlQuery(query);
+      assertNull(brokerResponse.getSelectionResults());
+      assertNotNull(brokerResponse.getResultTable());
+      assertEquals(brokerResponse.getNumSegmentsProcessed(), 
numSegmentsPerServer * NUM_SERVERS);
+      assertEquals(brokerResponse.getNumSegmentsMatched(), numThreadsPerServer 
* NUM_SERVERS);
+      assertEquals(brokerResponse.getNumDocsScanned(), numThreadsPerServer * 
NUM_SERVERS * limit);
+      assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+      assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+          numThreadsPerServer * NUM_SERVERS * limit * numColumnsInSelection);
+      assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+    }
+  }
+
+  /**
+   * Without early termination, selection order-by query should hit all 
segments.
+   */
+  @Test
+  public void testSelectWithOrderByQuery() {
+    int numSegmentsPerServer = getNumSegmentDataManagers();
+    String query = "SELECT column11, column18, column1 FROM testTable ORDER BY 
column11";
+    int numColumnsInSelection = 3;
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    assertNotNull(brokerResponse.getSelectionResults());
+    assertNull(brokerResponse.getResultTable());
+    assertEquals(brokerResponse.getNumSegmentsProcessed(), 
numSegmentsPerServer * NUM_SERVERS);
+    assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer 
* NUM_SERVERS);
+    assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+        numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT * 
numColumnsInSelection);
+    assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+
+    brokerResponse = getBrokerResponseForSqlQuery(query);
+    assertNull(brokerResponse.getSelectionResults());
+    assertNotNull(brokerResponse.getResultTable());
+    assertEquals(brokerResponse.getNumSegmentsProcessed(), 
numSegmentsPerServer * NUM_SERVERS);
+    assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer 
* NUM_SERVERS);
+    assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
+        numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT * 
numColumnsInSelection);
+    assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to