mqliang commented on a change in pull request #6678:
URL: https://github.com/apache/incubator-pinot/pull/6678#discussion_r593385861



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
##########
@@ -63,24 +59,101 @@
   // in each segment. We still put a limit across segments to protect cases 
where data is very skewed across different
   // segments.
   private static final int INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR = 2;
-
-  private final List<Operator> _operators;
-  private final QueryContext _queryContext;
-  private final ExecutorService _executorService;
-  private final long _endTimeMs;
   // Limit on number of groups stored, beyond which no new group will be 
created
   private final int _innerSegmentNumGroupsLimit;
   private final int _interSegmentNumGroupsLimit;
 
+  private final ConcurrentHashMap<String, Object[]> _resultsMap = new 
ConcurrentHashMap<>();
+  private final AtomicInteger _numGroups = new AtomicInteger();
+  private final ConcurrentLinkedQueue<ProcessingException> 
_mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  // Besides the CountDownLatch, we also use a Phaser to ensure all the 
Futures are done (not scheduled, finished or
+  // interrupted) before the main thread returns. We need to ensure no 
execution left before the main thread returning
+  // because the main thread holds the reference to the segments, and if the 
segments are deleted/refreshed, the
+  // segments can be released after the main thread returns, which would lead 
to undefined behavior (even JVM crash)
+  // when executing queries against them.
+  private final CountDownLatch _operatorLatch;
+
   public GroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService,
       long endTimeMs, int innerSegmentNumGroupsLimit) {
-    _operators = operators;
-    _queryContext = queryContext;
-    _executorService = executorService;
-    _endTimeMs = endTimeMs;
+    super(operators, queryContext, executorService, endTimeMs, 
operators.size());

Review comment:
       @siddharthteotia GroupByCombineOperator explicitly specify 
numberOfThread as numOfOperator here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to