mqliang commented on a change in pull request #6678:
URL: https://github.com/apache/incubator-pinot/pull/6678#discussion_r593385861
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
##########
@@ -63,24 +59,101 @@
// in each segment. We still put a limit across segments to protect cases
where data is very skewed across different
// segments.
private static final int INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR = 2;
-
- private final List<Operator> _operators;
- private final QueryContext _queryContext;
- private final ExecutorService _executorService;
- private final long _endTimeMs;
// Limit on number of groups stored, beyond which no new group will be
created
private final int _innerSegmentNumGroupsLimit;
private final int _interSegmentNumGroupsLimit;
+ private final ConcurrentHashMap<String, Object[]> _resultsMap = new
ConcurrentHashMap<>();
+ private final AtomicInteger _numGroups = new AtomicInteger();
+ private final ConcurrentLinkedQueue<ProcessingException>
_mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
+ private final AggregationFunction[] _aggregationFunctions;
+ private final int _numAggregationFunctions;
+ // We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
+ // _futures (try to interrupt the execution if it already started).
+ // Besides the CountDownLatch, we also use a Phaser to ensure all the
Futures are done (not scheduled, finished or
+ // interrupted) before the main thread returns. We need to ensure no
execution left before the main thread returning
+ // because the main thread holds the reference to the segments, and if the
segments are deleted/refreshed, the
+ // segments can be released after the main thread returns, which would lead
to undefined behavior (even JVM crash)
+ // when executing queries against them.
+ private final CountDownLatch _operatorLatch;
+
public GroupByCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService,
long endTimeMs, int innerSegmentNumGroupsLimit) {
- _operators = operators;
- _queryContext = queryContext;
- _executorService = executorService;
- _endTimeMs = endTimeMs;
+ super(operators, queryContext, executorService, endTimeMs,
operators.size());
Review comment:
@siddharthteotia GroupByCombineOperator explicitly specify
numberOfThread as numOfOperator here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]