Jackie-Jiang commented on code in PR #14662:
URL: https://github.com/apache/pinot/pull/14662#discussion_r1926225542


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java:
##########
@@ -248,6 +248,21 @@ public static Integer getGroupTrimThreshold(Map<String, 
String> queryOptions) {
     return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, 
groupByTrimThreshold);
   }
 
+  @Nullable
+  public static Integer getNumThreadsExtractFinalResult(Map<String, String> 
queryOptions) {
+    String numThreadsExtractFinalResultString = 
queryOptions.get(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT);
+    return checkedParseInt(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT, 
numThreadsExtractFinalResultString, 1);
+  }
+
+  @Nullable
+  public static Integer getChunkSizeExtractFinalResult(Map<String, String> 
queryOptions) {
+    String chunkSizeExtractFinalResultString =
+        queryOptions.get(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT);
+    return checkedParseInt(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT,
+        chunkSizeExtractFinalResultString,
+        1);

Review Comment:
   (nit) reformat



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -84,6 +94,10 @@ protected IndexedTable(DataSchema dataSchema, boolean 
hasFinalInput, QueryContex
     assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == 
Integer.MAX_VALUE);
     _trimSize = trimSize;
     _trimThreshold = trimThreshold;
+    // NOTE: The upper limit of threads number for final reduce is set to 2 * 
number of available processors by default
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, 2 * Runtime.getRuntime().availableProcessors()));

Review Comment:
   We should probably cap it at CPU cores because this is CPU heavy operation



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java:
##########
@@ -95,6 +95,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public static final String GROUPBY_TRIM_THRESHOLD_KEY = 
"groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
 
+  public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1;
+  public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = 
10_000;

Review Comment:
   Rename them



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -157,14 +171,88 @@ public void finish(boolean sort, boolean 
storeFinalResult) {
       for (int i = 0; i < numAggregationFunctions; i++) {
         columnDataTypes[i + _numKeyColumns] = 
_aggregationFunctions[i].getFinalResultColumnType();
       }
-      for (Record record : _topRecords) {
-        Object[] values = record.getValues();
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          int colId = i + _numKeyColumns;
-          values[colId] = 
_aggregationFunctions[i].extractFinalResult(values[colId]);
+      int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+      // Submit task when the EXECUTOR_SERVICE is not overloaded
+      if (numThreadsExtractFinalResult > 1) {
+        // Multi-threaded final reduce
+        List<Future<Void>> futures = new ArrayList<>();

Review Comment:
   ```suggestion
           List<Future<Void>> futures = new 
ArrayList<>(numThreadsExtractFinalResult);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -157,14 +171,88 @@ public void finish(boolean sort, boolean 
storeFinalResult) {
       for (int i = 0; i < numAggregationFunctions; i++) {
         columnDataTypes[i + _numKeyColumns] = 
_aggregationFunctions[i].getFinalResultColumnType();
       }
-      for (Record record : _topRecords) {
-        Object[] values = record.getValues();
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          int colId = i + _numKeyColumns;
-          values[colId] = 
_aggregationFunctions[i].extractFinalResult(values[colId]);
+      int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+      // Submit task when the EXECUTOR_SERVICE is not overloaded
+      if (numThreadsExtractFinalResult > 1) {
+        // Multi-threaded final reduce
+        List<Future<Void>> futures = new ArrayList<>();
+        try {
+          List<Record> topRecordsList = new ArrayList<>(_topRecords);
+          int chunkSize = (topRecordsList.size() + 
numThreadsExtractFinalResult - 1) / numThreadsExtractFinalResult;
+          for (int threadId = 0; threadId < numThreadsExtractFinalResult; 
threadId++) {
+            int startIdx = threadId * chunkSize;
+            int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
+            if (startIdx < endIdx) {

Review Comment:
   Is this always true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to