richardstartin commented on a change in pull request #8272:
URL: https://github.com/apache/pinot/pull/8272#discussion_r817701130
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
##########
@@ -106,15 +104,18 @@ public BrokerResponseNative
reduceOnStreamResponse(BrokerRequest brokerRequest,
return brokerResponseNative;
}
- private static void processIterativeServerResponse(StreamingReducer reducer,
ExecutorService executorService,
+ @VisibleForTesting
+ static void processIterativeServerResponse(StreamingReducer reducer,
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>>
serverResponseMap, long reduceTimeOutMs,
- ExecutionStatsAggregator aggregator) throws Exception {
+ ExecutionStatsAggregator aggregator)
+ throws Exception {
int cnt = 0;
- Future[] futures = new Future[serverResponseMap.size()];
- CountDownLatch countDownLatch = new
CountDownLatch(serverResponseMap.size());
-
- for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>>
entry: serverResponseMap.entrySet()) {
- futures[cnt++] = executorService.submit(() -> {
+ CompletableFuture<Void>[] futures = new
CompletableFuture[serverResponseMap.size()];
+ // based on ideas from on
https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future
+ // and
https://stackoverflow.com/questions/23301598/transform-java-future-into-a-completablefuture
+ // Future created via ExecutorService.submit() can be created by
CompletableFuture.supplyAsync()
+ for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>>
entry : serverResponseMap.entrySet()) {
+ futures[cnt++] = CompletableFuture.supplyAsync(() -> {
Review comment:
This executes in a global pool which isn't the right behaviour. You can
supply an executor to make it execute in the allotted thread pool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]