leventov commented on a change in pull request #6629: Add support parallel 
combine in brokers
URL: https://github.com/apache/incubator-druid/pull/6629#discussion_r241089247
 
 

 ##########
 File path: processing/src/main/java/org/apache/druid/query/QueryContexts.java
 ##########
 @@ -174,6 +179,35 @@
     }
   }
 
+  private static int checkPositive(String propertyName, int val)
+  {
+    Preconditions.checkArgument(
+        val > 0,
+        "%s should be positive, but [%s]",
+        propertyName,
+        val
+    );
+    return val;
+  }
+
+  /**
+   * Return the configured number of combine threads if any. Others {@link 
#NO_PARALLEL_COMBINE_THREADS}.
+   */
+  public static <T> int getNumBrokerParallelCombineThreads(Query<T> query)
+  {
+    return parseInt(query, NUM_BROKER_PARALLEL_COMBINE_THREADS, 
NO_PARALLEL_COMBINE_THREADS);
 
 Review comment:
   What I want is that there is no `NUM_BROKER_PARALLEL_COMBINE_THREADS` 
config, or at least it defaults to "parallelize ergonomically".
   
   As far as I can tell the main struggle is that is a very large combine task 
arrives, we want to give it as much parallelism as possible, but not to block 
concurrently arriving small tasks.
   
   It think it could be accomplished by:
    1) start processing in a `ForkJoinTask`, checking the total processing time 
(using `System.nanoTime()`) e. g. after each 1000 elements processed.
    2) If after some point the total processing time exceeds 10ms, we divide 
the number of total elements to combine by the number of currently processed, 
and get the number of chunks, processing of each of them should take 
approximately 10ms (we should be pretty confident about it, because processing 
of each element should take deterministic time)
    3) ForkJoinTask reforks itself into another task (preserving and using the 
same combine state), that processes the number of elements equal to the chunk 
size, and then reforks again, and so on until the elements are exhausted. All 
using the same state.
   
   It means that processing is still essentially sequential, just with 
artificial "forking points" each ~10 ms. Now to ensure that small combine tasks 
are not blocked, probably we just need to use a ForkJoinPool in 
[`asyncMode`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#ForkJoinPool-int-java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory-java.lang.Thread.UncaughtExceptionHandler-boolean-).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to