bziobrowski commented on code in PR #14662:
URL: https://github.com/apache/pinot/pull/14662#discussion_r1905103748
##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -157,16 +184,86 @@ public void finish(boolean sort, boolean
storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] =
_aggregationFunctions[i].getFinalResultColumnType();
}
- for (Record record : _topRecords) {
- Object[] values = record.getValues();
- for (int i = 0; i < numAggregationFunctions; i++) {
- int colId = i + _numKeyColumns;
- values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ int numThreadsForFinalReduce = inferNumThreadsForFinalReduce();
+ // Submit task when the EXECUTOR_SERVICE is not overloaded
+ if ((numThreadsForFinalReduce > 1) &&
(EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) {
+ // Multi-threaded final reduce
+ List<Future<Void>> futures = new ArrayList<>();
+ try {
+ List<Record> topRecordsList = new ArrayList<>(_topRecords);
+ int chunkSize = (topRecordsList.size() + numThreadsForFinalReduce -
1) / numThreadsForFinalReduce;
+ for (int threadId = 0; threadId < numThreadsForFinalReduce;
threadId++) {
+ int startIdx = threadId * chunkSize;
+ int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
+ if (startIdx < endIdx) {
+ // Submit a task for processing a chunk of values
+ futures.add(EXECUTOR_SERVICE.submit(new TraceCallable<Void>() {
+ @Override
+ public Void callJob() {
+ for (int recordIdx = startIdx; recordIdx < endIdx;
recordIdx++) {
+ Object[] values =
topRecordsList.get(recordIdx).getValues();
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ int colId = i + _numKeyColumns;
+ values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ }
+ }
+ return null;
+ }
+ }));
+ }
+ }
+ // Wait for all tasks to complete
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ // Cancel all running tasks
+ for (Future<Void> future : futures) {
+ future.cancel(true);
+ }
+ throw new RuntimeException("Error during multi-threaded final
reduce", e);
+ }
+ } else {
+ for (Record record : _topRecords) {
+ Object[] values = record.getValues();
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ int colId = i + _numKeyColumns;
+ values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ }
}
}
}
}
+ private int inferNumThreadsForFinalReduce() {
+ if (_numThreadsForFinalReduce > 1) {
+ return _numThreadsForFinalReduce;
+ }
+ if (containsExpensiveAggregationFunctions()) {
+ int parallelChunkSize = _parallelChunkSizeForFinalReduce;
+ if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
+ return (int) Math.ceil((double) _topRecords.size() /
parallelChunkSize);
+ }
+ }
+ // Default to 1 thread
+ return 1;
+ }
+
Review Comment:
Are there existing tests that execute multi-threaded reduction via this
heuristic ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]