walterddr commented on code in PR #10169:
URL: https://github.com/apache/pinot/pull/10169#discussion_r1085993792
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java:
##########
@@ -71,11 +68,8 @@ protected BaseResultsBlock getNextBlock() {
try {
startProcess();
mergedBlock = mergeResults();
- } catch (InterruptedException | EarlyTerminationException e) {
- Exception killedErrorMsg =
Tracing.getThreadAccountant().getErrorStatus();
- throw new QueryCancelledException(
- "Cancelled while merging results blocks" + (killedErrorMsg == null ?
StringUtils.EMPTY
- : " " + killedErrorMsg), e);
+ } catch (InterruptedException e) {
+ throw new EarlyTerminationException();
Review Comment:
plz include the original exception ?
```suggestion
throw new EarlyTerminationException(e);
```
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java:
##########
@@ -111,13 +105,11 @@ protected void processSegments() {
((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
-
+ _blockingQueue.offer(resultsBlock);
+ // When query is satisfied, skip processing the remaining segments
if (_resultsBlockMerger.isQuerySatisfied(resultsBlock)) {
- // Query is satisfied, skip processing the remaining segments
- _blockingQueue.offer(resultsBlock);
+ _nextOperatorId.set(_numOperators);
Review Comment:
hmm. i am a bit worry about this one.
basically, this will make numBlocksMerged to practically impossible to reach
numOperators during mergeResultBlock method.
so we are solely relying on the early termination mechanism to also early
terminate on the mergeResultBlock.
however the
`_resultsBlockMerger.isQuerySatisfied(resultsBlock)` is not the same as the
one in merge method
`_resultsBlockMerger.isQuerySatisfied(mregedBlock)`
are we 100% sure these are equivalent for sure?
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java:
##########
@@ -50,49 +52,55 @@
// Use a BlockingQueue to store the intermediate results blocks
protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new
LinkedBlockingQueue<>();
+ protected final ResultsBlockMerger<T> _resultsBlockMerger;
protected int _numOperatorsFinished;
- protected BaseResultsBlock _exceptionBlock;
+ protected boolean _querySatisfied;
- public BaseStreamingCombineOperator(List<Operator> operators,
+ public BaseStreamingCombineOperator(ResultsBlockMerger<T>
resultsBlockMerger, List<Operator> operators,
QueryContext queryContext, ExecutorService executorService) {
super(operators, queryContext, executorService);
+ _resultsBlockMerger = resultsBlockMerger;
}
+ /**
+ * {@inheritDoc}
+ *
+ * When all the results blocks are returned, returns a final metadata block.
Caller shouldn't call this method after
+ * it returns the metadata block or exception block.
+ */
@Override
protected BaseResultsBlock getNextBlock() {
long endTimeMs = _queryContext.getEndTimeMs();
- // TODO: Early terminate when query is satisfied
- while (_exceptionBlock == null && _numOperatorsFinished < _numOperators) {
+ Object querySatisfiedTracker = createQuerySatisfiedTracker();
+ while (!_querySatisfied && _numOperatorsFinished < _numOperators) {
try {
BaseResultsBlock resultsBlock =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
-
if (resultsBlock == null) {
// Query times out, skip streaming the remaining results blocks
LOGGER.error("Timed out while polling results block (query: {})",
_queryContext);
- _exceptionBlock = new ExceptionResultsBlock(
-
QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results
block")));
- return _exceptionBlock;
+ return new
ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
}
if (resultsBlock.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip streaming the
remaining results blocks and directly return
// the exception
- _exceptionBlock = resultsBlock;
- return _exceptionBlock;
+ return resultsBlock;
}
if (resultsBlock == LAST_RESULTS_BLOCK) {
// Caught LAST_RESULTS_BLOCK from a specific task, indicated it has
finished.
// Skip returning this metadata block and continue to process the
next from the _blockingQueue.
_numOperatorsFinished++;
continue;
}
+ _querySatisfied = isQuerySatisfied((T) resultsBlock,
querySatisfiedTracker);
return resultsBlock;
+ } catch (InterruptedException e) {
+ throw new EarlyTerminationException();
Review Comment:
actually i have another suggestion, let's put `e` in it, which might include
stack trace, or do a logger here.
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java:
##########
@@ -111,6 +116,10 @@ private BaseResultsBlock getCombinedResults() {
try {
prefetchAll();
return _combineOperator.nextBlock();
+ } catch (EarlyTerminationException e) {
+ Exception killedErrorMsg =
Tracing.getThreadAccountant().getErrorStatus();
+ return new ExceptionResultsBlock(new QueryCancelledException(
+ "Cancelled while combining results" + (killedErrorMsg == null ?
StringUtils.EMPTY : " " + killedErrorMsg)));
Review Comment:
this is highly unlikely but i was able to fiddle around the underlying
threading mechanism to reproduce it :-D.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]