mqliang commented on a change in pull request #6678:
URL: https://github.com/apache/incubator-pinot/pull/6678#discussion_r593386095
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
##########
@@ -39,165 +38,153 @@
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
-import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.exception.EarlyTerminationException;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GroupByUtils;
-import org.apache.pinot.core.util.trace.TraceRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Combine operator for aggregation group-by queries with SQL semantic.
- * TODO:
- * - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism
of the query instead of using all threads
- * - Try to extend BaseCombineOperator to reduce duplicate code
+ * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the
parallelism of the query instead of using
+ * all threads
*/
@SuppressWarnings("rawtypes")
-public class GroupByOrderByCombineOperator extends
BaseOperator<IntermediateResultsBlock> {
+public class GroupByOrderByCombineOperator extends BaseCombineOperator {
+ public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
private static final Logger LOGGER =
LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
- public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
-
- private final List<Operator> _operators;
- private final QueryContext _queryContext;
- private final ExecutorService _executorService;
- private final long _endTimeMs;
private final int _trimSize;
private final int _trimThreshold;
private final Lock _initLock;
+ private final int _numAggregationFunctions;
+ private final int _numGroupByExpressions;
+ private final int _numColumns;
+ private final ConcurrentLinkedQueue<ProcessingException>
_mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
+ // We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
+ // _futures (try to interrupt the execution if it already started).
+ // Besides the CountDownLatch, we also use a Phaser to ensure all the
Futures are done (not scheduled, finished or
+ // interrupted) before the main thread returns. We need to ensure no
execution left before the main thread returning
+ // because the main thread holds the reference to the segments, and if the
segments are deleted/refreshed, the
+ // segments can be released after the main thread returns, which would lead
to undefined behavior (even JVM crash)
+ // when executing queries against them.
+ private final CountDownLatch _operatorLatch;
private DataSchema _dataSchema;
private ConcurrentIndexedTable _indexedTable;
public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext
queryContext,
ExecutorService executorService, long endTimeMs, int trimThreshold) {
- _operators = operators;
- _queryContext = queryContext;
- _executorService = executorService;
- _endTimeMs = endTimeMs;
+ super(operators, queryContext, executorService, endTimeMs,
operators.size());
Review comment:
@siddharthteotia GroupByOrderByCombineOperator explicitly specify
numberOfThread as numOfOperator here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]