Jackie-Jiang commented on code in PR #14662:
URL: https://github.com/apache/pinot/pull/14662#discussion_r1926225542
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java:
##########
@@ -248,6 +248,21 @@ public static Integer getGroupTrimThreshold(Map<String,
String> queryOptions) {
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD,
groupByTrimThreshold);
}
+ @Nullable
+ public static Integer getNumThreadsExtractFinalResult(Map<String, String>
queryOptions) {
+ String numThreadsExtractFinalResultString =
queryOptions.get(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT);
+ return checkedParseInt(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT,
numThreadsExtractFinalResultString, 1);
+ }
+
+ @Nullable
+ public static Integer getChunkSizeExtractFinalResult(Map<String, String>
queryOptions) {
+ String chunkSizeExtractFinalResultString =
+ queryOptions.get(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT);
+ return checkedParseInt(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT,
+ chunkSizeExtractFinalResultString,
+ 1);
Review Comment:
(nit) reformat
##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -84,6 +94,10 @@ protected IndexedTable(DataSchema dataSchema, boolean
hasFinalInput, QueryContex
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold ==
Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
+ // NOTE: The upper limit of threads number for final reduce is set to 2 *
number of available processors by default
+ _numThreadsExtractFinalResult =
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+ Math.max(1, 2 * Runtime.getRuntime().availableProcessors()));
Review Comment:
We should probably cap it at CPU cores because this is CPU heavy operation
##########
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java:
##########
@@ -95,6 +95,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String GROUPBY_TRIM_THRESHOLD_KEY =
"groupby.trim.threshold";
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+ public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1;
+ public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE =
10_000;
Review Comment:
Rename them
##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -157,14 +171,88 @@ public void finish(boolean sort, boolean
storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] =
_aggregationFunctions[i].getFinalResultColumnType();
}
- for (Record record : _topRecords) {
- Object[] values = record.getValues();
- for (int i = 0; i < numAggregationFunctions; i++) {
- int colId = i + _numKeyColumns;
- values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+ // Submit task when the EXECUTOR_SERVICE is not overloaded
+ if (numThreadsExtractFinalResult > 1) {
+ // Multi-threaded final reduce
+ List<Future<Void>> futures = new ArrayList<>();
Review Comment:
```suggestion
List<Future<Void>> futures = new
ArrayList<>(numThreadsExtractFinalResult);
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -157,14 +171,88 @@ public void finish(boolean sort, boolean
storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] =
_aggregationFunctions[i].getFinalResultColumnType();
}
- for (Record record : _topRecords) {
- Object[] values = record.getValues();
- for (int i = 0; i < numAggregationFunctions; i++) {
- int colId = i + _numKeyColumns;
- values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+ // Submit task when the EXECUTOR_SERVICE is not overloaded
+ if (numThreadsExtractFinalResult > 1) {
+ // Multi-threaded final reduce
+ List<Future<Void>> futures = new ArrayList<>();
+ try {
+ List<Record> topRecordsList = new ArrayList<>(_topRecords);
+ int chunkSize = (topRecordsList.size() +
numThreadsExtractFinalResult - 1) / numThreadsExtractFinalResult;
+ for (int threadId = 0; threadId < numThreadsExtractFinalResult;
threadId++) {
+ int startIdx = threadId * chunkSize;
+ int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
+ if (startIdx < endIdx) {
Review Comment:
Is this always true?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]