Jackie-Jiang commented on code in PR #9286:
URL: https://github.com/apache/pinot/pull/9286#discussion_r956647000
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -198,7 +198,10 @@ protected IntermediateResultsBlock mergeResults()
while (numBlocksMerged < _numOperators) {
IntermediateResultsBlock blockToMerge =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- if (blockToMerge == null) {
+ // Timeout has reached, shouldn't continue to process.
`_blockingQueue.poll` will continue to return blocks even
Review Comment:
(minor) Good catch. We may save an extra time read by checking if the wait
time (`endTimeMs - System.currentTimeMillis()`) is positive before reading the
block
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java:
##########
@@ -197,6 +205,13 @@ protected void processSegments(int taskIndex) {
}
}
+ // Check for thread interruption, every time after merging 10_000 keys
+ private void checkMergePhaseInterruption(long mergedKeys) {
+ if (mergedKeys % DocIdSetPlanNode.MAX_DOC_PER_CALL == 0 &&
Thread.interrupted()) {
Review Comment:
(minor) Let's use a separate constant for this
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java:
##########
@@ -165,6 +167,8 @@ protected void processSegments(int taskIndex) {
// Merge aggregation group-by result.
// Iterate over the group-by keys, for each key, update the group-by
result in the indexedTable
Collection<IntermediateRecord> intermediateRecords =
resultsBlock.getIntermediateRecords();
+ // Count the number of merged keys
+ long mergedKeys = 0;
Review Comment:
(minor) This can be `int`
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -142,7 +142,7 @@ public void init(TableDataManagerConfig
tableDataManagerConfig, String instanceI
_isStreamSegmentDownloadUntar =
tableDataManagerParams.isStreamSegmentDownloadUntar();
if (_isStreamSegmentDownloadUntar) {
LOGGER.info("Using streamed download-untar for segment download! "
- + "The rate limit interval for streamed download-untar is {} ms",
+ + "The rate limit interval for streamed download-untar is {}
bytes",
Review Comment:
This is still confusing. IIUC, this should be `The rate limit for streamed
download-untar is {} bytes/s"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]