mayankshriv commented on a change in pull request #6044:
URL: https://github.com/apache/incubator-pinot/pull/6044#discussion_r504308428
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
##########
@@ -231,58 +239,130 @@ private DataSchema
getPrePostAggregationDataSchema(DataSchema dataSchema) {
return new DataSchema(columnNames, columnDataTypes);
}
- private IndexedTable getIndexedTable(DataSchema dataSchema,
Collection<DataTable> dataTables) {
+ private IndexedTable getIndexedTable(DataSchema dataSchema,
Collection<DataTable> dataTablesToReduce,
+ DataTableReducerContext reducerContext) {
+ long start = System.currentTimeMillis();
+ int numDataTables = dataTablesToReduce.size();
+
+ // Get the number of threads to use for reducing.
+ int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables,
reducerContext.getMaxReduceThreadsPerQuery());
+
+ // In case of single reduce thread, fall back to SimpleIndexedTable to
avoid redundant locking/unlocking calls.
int capacity = GroupByUtils.getTableCapacity(_queryContext);
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
_queryContext, capacity);
+ IndexedTable indexedTable =
+ (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema,
_queryContext, capacity)
+ : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+
+ Future[] futures = new Future[numDataTables];
Review comment:
Did some perf benchmarking on high throughput use case. The overhead
does not seem to register.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]