jihoonson commented on a change in pull request #6629: Add support parallel
combine in brokers
URL: https://github.com/apache/incubator-druid/pull/6629#discussion_r240882347
##########
File path:
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -285,12 +301,72 @@ public CachingClusteredClient(
List<Sequence<T>> sequencesByInterval = new
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
addSequencesFromServer(sequencesByInterval, segmentsByServer);
- return Sequences
- .simple(sequencesByInterval)
- .flatMerge(seq -> seq, query.getResultOrdering());
+ return merge(sequencesByInterval);
});
}
+ private Sequence<T> merge(List<Sequence<T>> sequencesByInterval)
+ {
+ final int numParallelCombineThreads =
QueryContexts.getNumBrokerParallelCombineThreads(query);
+
+ if (numParallelCombineThreads > 0) {
+ final ReserveResult reserveResult =
processingThreadResourcePool.reserve(query, numParallelCombineThreads);
+ if (!reserveResult.isOk()) {
+ throw new ISE(
+ "Not enough processing threads. The query needs [%d] threads,
but only [%d] were available",
+ numParallelCombineThreads,
+ reserveResult.getNumAvailableResources()
+ );
+ }
+ return parallelMerge(sequencesByInterval,
reserveResult.getResources());
+ } else if (numParallelCombineThreads ==
QueryContexts.NUM_CURRENT_AVAILABLE_THREADS) {
+ final ReserveResult reserveResult =
processingThreadResourcePool.reserve(query, numParallelCombineThreads);
+ if (reserveResult.isOk()) {
+ return parallelMerge(sequencesByInterval,
reserveResult.getResources());
+ } else {
+ return sequentialMerge(sequencesByInterval);
+ }
+ } else if (numParallelCombineThreads ==
QueryContexts.NO_PARALLEL_COMBINE_THREADS) {
+ return sequentialMerge(sequencesByInterval);
+ } else {
+ throw new ISE(
Review comment:
I don't think this is a good change. `numParallelCombineThreads` option is
used for only `CachingClusteredClient`, so it should be responsible for
checking the given option is valid or not.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]