leventov commented on a change in pull request #6629: Add support parallel
combine in brokers
URL: https://github.com/apache/incubator-druid/pull/6629#discussion_r241089247
##########
File path: processing/src/main/java/org/apache/druid/query/QueryContexts.java
##########
@@ -174,6 +179,35 @@
}
}
+ private static int checkPositive(String propertyName, int val)
+ {
+ Preconditions.checkArgument(
+ val > 0,
+ "%s should be positive, but [%s]",
+ propertyName,
+ val
+ );
+ return val;
+ }
+
+ /**
+ * Return the configured number of combine threads if any. Others {@link
#NO_PARALLEL_COMBINE_THREADS}.
+ */
+ public static <T> int getNumBrokerParallelCombineThreads(Query<T> query)
+ {
+ return parseInt(query, NUM_BROKER_PARALLEL_COMBINE_THREADS,
NO_PARALLEL_COMBINE_THREADS);
Review comment:
What I want is that there is no `NUM_BROKER_PARALLEL_COMBINE_THREADS`
config, or at least it defaults to "parallelize ergonomically".
As far as I can tell the main struggle is that is a very large combine task
arrives, we want to give it as much parallelism as possible, but not to block
concurrently arriving small tasks.
It think it could be accomplished by:
1) start processing in a `ForkJoinTask`, checking the total processing time
(using `System.nanoTime()`) e. g. after each 1000 elements processed.
2) If after some point the total processing time exceeds 10ms, we divide
the number of total elements to combine by the number of currently processed,
and get the number of chunks, processing of each of them should take
approximately 10ms (we should be pretty confident about it, because processing
of each element should take deterministic time)
3) ForkJoinTask reforks itself into another task (preserving and using the
same combine state), that processes the number of elements equal to the chunk
size, and then reforks again, and so on until the elements are exhausted. All
using the same state.
It means that processing is still essentially sequential, just with
artificial "forking points" each ~10 ms. Now to ensure that small combine tasks
are not blocked, probably we just need to use a ForkJoinPool in
[`asyncMode`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#ForkJoinPool-int-java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory-java.lang.Thread.UncaughtExceptionHandler-boolean-).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]