xiangfu0 commented on code in PR #14662:
URL: https://github.com/apache/pinot/pull/14662#discussion_r1927437599


##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -157,14 +171,88 @@ public void finish(boolean sort, boolean 
storeFinalResult) {
       for (int i = 0; i < numAggregationFunctions; i++) {
         columnDataTypes[i + _numKeyColumns] = 
_aggregationFunctions[i].getFinalResultColumnType();
       }
-      for (Record record : _topRecords) {
-        Object[] values = record.getValues();
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          int colId = i + _numKeyColumns;
-          values[colId] = 
_aggregationFunctions[i].extractFinalResult(values[colId]);
+      int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+      // Submit task when the EXECUTOR_SERVICE is not overloaded
+      if (numThreadsExtractFinalResult > 1) {
+        // Multi-threaded final reduce
+        List<Future<Void>> futures = new ArrayList<>();
+        try {
+          List<Record> topRecordsList = new ArrayList<>(_topRecords);
+          int chunkSize = (topRecordsList.size() + 
numThreadsExtractFinalResult - 1) / numThreadsExtractFinalResult;
+          for (int threadId = 0; threadId < numThreadsExtractFinalResult; 
threadId++) {
+            int startIdx = threadId * chunkSize;
+            int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
+            if (startIdx < endIdx) {

Review Comment:
   
   not always the case in the test with very small segment.



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java:
##########
@@ -95,6 +95,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public static final String GROUPBY_TRIM_THRESHOLD_KEY = 
"groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
 
+  public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1;
+  public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = 
10_000;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to