siddharthteotia commented on code in PR #10169:
URL: https://github.com/apache/pinot/pull/10169#discussion_r1084762766
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java:
##########
@@ -50,49 +52,55 @@
// Use a BlockingQueue to store the intermediate results blocks
protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new
LinkedBlockingQueue<>();
+ protected final ResultsBlockMerger<T> _resultsBlockMerger;
protected int _numOperatorsFinished;
- protected BaseResultsBlock _exceptionBlock;
+ protected boolean _querySatisfied;
- public BaseStreamingCombineOperator(List<Operator> operators,
+ public BaseStreamingCombineOperator(ResultsBlockMerger<T>
resultsBlockMerger, List<Operator> operators,
QueryContext queryContext, ExecutorService executorService) {
super(operators, queryContext, executorService);
+ _resultsBlockMerger = resultsBlockMerger;
}
+ /**
+ * {@inheritDoc}
+ *
+ * When all the results blocks are returned, returns a final metadata block.
Caller shouldn't call this method after
+ * it returns the metadata block or exception block.
+ */
@Override
protected BaseResultsBlock getNextBlock() {
long endTimeMs = _queryContext.getEndTimeMs();
- // TODO: Early terminate when query is satisfied
- while (_exceptionBlock == null && _numOperatorsFinished < _numOperators) {
+ Object querySatisfiedTracker = createQuerySatisfiedTracker();
+ while (!_querySatisfied && _numOperatorsFinished < _numOperators) {
try {
BaseResultsBlock resultsBlock =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
-
if (resultsBlock == null) {
// Query times out, skip streaming the remaining results blocks
LOGGER.error("Timed out while polling results block (query: {})",
_queryContext);
- _exceptionBlock = new ExceptionResultsBlock(
-
QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results
block")));
- return _exceptionBlock;
+ return new
ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
}
if (resultsBlock.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip streaming the
remaining results blocks and directly return
// the exception
- _exceptionBlock = resultsBlock;
- return _exceptionBlock;
+ return resultsBlock;
}
if (resultsBlock == LAST_RESULTS_BLOCK) {
// Caught LAST_RESULTS_BLOCK from a specific task, indicated it has
finished.
// Skip returning this metadata block and continue to process the
next from the _blockingQueue.
_numOperatorsFinished++;
continue;
}
+ _querySatisfied = isQuerySatisfied((T) resultsBlock,
querySatisfiedTracker);
return resultsBlock;
+ } catch (InterruptedException e) {
+ throw new EarlyTerminationException();
Review Comment:
May be we should instantiate `EarlyTerminationException` with
`e.getMessage()` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]