clintropolis opened a new issue #8577: parallel broker merges
URL: https://github.com/apache/incubator-druid/issues/8577
 
 
   ### Motivation
   
   Brokers should be able to merge query result sets in parallel, 
adaptively/automatically, based on current overall utilization. The 
"merge/combine" of sequences constitutes the bulk of the real work that Brokers 
perform. This currently takes place within a single thread from the HTTP thread 
pool, which while fair-ish, means that we are also potentially under-utilizing 
additional cores on the server if the majority of queries are blocked waiting 
for results. Using a divide and conquer approach to perform this combining 
merge of results in parallel should allow us to often _dramatically_ speed up 
the time this operation takes, and should also make broker resource utilization 
more predictable at the same time.
   
   ### Proposed changes
   
   To achieve this, we will introduce a new opt-in mode to enable parallel 
merging of results by Druid brokers using a fork-join pool in 'async' mode. 
This proposal is the result of running with the basic idea captured here 
https://github.com/apache/incubator-druid/pull/6629#pullrequestreview-181900933,
 and building on the backs of the good work done in #5913 and #6629, creating a 
couple of prototype implementations, and performing a large number of 
experiments. 
   
   My current approach uses a 2 layer hierachy, where the first layer merges 
sub-sets of input sequences and produces output to a blocking queue, and a 
single task second layer that merges input from the blocking queue outputs of 
the first layer into a single output blocking queue. The level of parallelism 
for layer 1 will be chosen automatically on current 'merge' pool utilization, 
and the fork-join tasks will self-tune to perform a limited number of 
operations per task, before yielding their results and forking a new task to 
continue the work when the new task is scheduled.
   
   In a nod to query vectorization which happens at the segment level for 
historicals, and more importantly, to minimize the number of blocking 
operations within fork-join pool tasks, the results from the input sequences 
will be yielded in small batches, processed in batches, and of course added to 
the output blocking queues batch at a time. While I haven't yet spent the time 
to find the ideal small batch size, batching seems to work dramatically better 
than processing and outputing a result at a time, which in some of my intial 
experiments was even slower in parallel than the existing serial approach due 
to high lock contention.
   
   A prototype implementation based on the experiments so far (but still 
missing a few features) is available here: 
https://github.com/apache/incubator-druid/compare/master...clintropolis:broker-parallel-merge-combine-artisanal-small-batch.
 The design will be described using the terms from this branch, but I consider 
everything fair game and willing to change based on discussion in this proposal.
   
   #### result merging on the fork-join pool
   A new class `OrderedResultBatch<T>` will capture this notion of result 
batches, wrapping a `Queue<T>`, as well as the idea of a 'terminal' object in 
order to communicate to downstream fork-join tasks that a sequence is 
completed. To simplify the processing of results without directly dealing with 
these batches, a cursor pattern to allow easily processing individual results 
from the batches: 
   
   ```
   class BatchedResultsCursor<T>implements ForkJoinPool.ManagedBlocker, 
Comparable<BatchedResultsCursor<T>>
   ```
   
   is also introduced, with implementations for 
`Yielder<OrderedResultBatch<T>>` and `BlockingQueue<OrderedResultBatch<T>>` to 
allow using the same types of worker tasks for both layer 1 and layer 2. The 
yielder cursors and blocking queue cursors operate slightly differently in that 
the yielder cursors are created 'pre-loaded' by virtue of converting the input 
sequences into accumulating yielders
   
   
   At the outer level, parallel merging will be exposed to 
`CachingClusteredClient` through a new sequence type:
   
   ```
   class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
   ```
   
   which will wrap all of the work done on the fork-join pool in the form of a 
yielding sequence, to allow easy integration through existing interfaces. The 
`ParallelMergeCombinginSequence` operates on a `List<Sequence<T>> 
baseSequences` to merge and combine results given an `Ordering<T> orderingFn` 
and `BinaryOperator<T> combineFn` to order and combine results from the 
sequences. The HTTP thread has a 'background' combining sequence that builds a 
sequence from an iterator over the batched results from the output blocking 
queue of the layer 2 task.
   
   Converting the sequence to a yielder will single `RecursiveAction` to run on 
the fork join pool:
   
   ```
   class ParallelMergeCombineAction<T> extends RecursiveAction
   ```
   
   which is responsible for computing the level of parallelism, convert the 
list of input sequences into `Yielder<OrderedResultBatch<T>>`, dividing the set 
of input batch yielders between the chosen level of parallelism, and spawning 
the layer 1 and layer 2 initialization tasks:
   
   ```
   class YielderMergeCombineSeedAction<T> extends RecursiveAction
   ```
   
   and 
   
   ```
   class BlockingQueueMergeCombineSeedAction<T> extends RecursiveAction
   ```
   
   which serve the purpose to block until the initial batch of results is 
produced and ready to process. Once the results are available, these tasks will 
produce `BatchedResultsCursor` for the result batch and then feed that into 
main worker task of the `ParallelMergeCombinginSequence`:
   
   ```
   class MergeCombineAction<T> extends RecursiveAction
   ```
   
   which does the actual merging of results. `MergeCombineAction` accepts a 
list of `BatchedResultsCursor` in a `PriorityQueue` that uses the `Ordering` 
function to sort. Results with the same ordering are then combined with the 
combining function while applicable before being added to an output 
`OrderedResultBatch` to be pushed to an output blocking queue. 
`MergeCombineAction` will "yield" after processing _n_ inputs, where _n_ is 
initially `1024`, and subsequently set by measuring the time it takes to 
process _n_ inputs and computing the ideal _n_ to run for `10ms`. The new _n_ 
is used for the next `MergeCombineAction` that is executed, continuing the work 
of processing the `BatchedResultsCursor` from the `PriorityQueue` until 
everything is completely drained, where a 'terminal' result batch is added to 
indicate to downstream processors that the stream is complete.
   
   The level of parallelization in the prototype is currently naively chosen by 
picking the maximum of available processors or remaining fork-join tasks, with 
a test oriented query context parameter to limit lower than available 
processors. I say naively because this should probably consider not just the 
remaning fork-join task slots, but how many queries are currently being 
processed, to attempt to save additional slots when a small number of queries 
are dominating the pool, but further experimentation and discussion I think 
might be required to pick an optimal strategy, as well as investigating the 
content mentioned in #8357.
   
   
   
   #### Prioritization
   
   The current prototype is lacking _any_ sort of prioritization or gated 
access to the fork-join pool. While I would like to test this first-come 
first-serve situation to see if it is 'good enough'I think for the initial PR. 
However, assuming it is not, I would advocate there being some sort of 
prioritized blocking mechanism to a fixed number of slots set via 
configuration, an effective maximum concurrent queries to be merging limit, to 
block before spawning fork-join tasks and release the slots when the merge is 
complete. This up front blocker, but run until merge complete model, should 
   
   Longer term I definitely think it is worth investigating an interleaving 
strategy, per #8356. I haven't spent a significant amount of time thinking 
about this yet, but I would naively imagine some sort of custom fork-join pool 
implementation where the 'execute' method goes through a prioritized queue, so 
that lower priority queries can be stalled in favor of higher priorities. 
   
   With the work broken up into small chunks like this, it seems like there 
could be even more elaborate strategies of constricting and expanding the 
number of parallel merge tasks based on pool utilization by just regrouping 
`BatchedResultsCursor`, but this would require much further investigation and 
testing than I think should be part of the initial effort on this.
   
   However, I would very much welcome a discussion on what is acceptable for an 
initial PR (keeping in mind this behavior will initially be opt-in)
   
   ### Rationale
   
   This seems like a much more flexible approach to dividing up the work for 
result merging at the broker level than the previous attempt in #6629, and a 
bit less invasive than the changes of #5913. The concept itself of the broker 
performing a magic parallel result merging I don't think is objectionable by 
anyone, so experimentation was necessary in order to provide the approach 
viable. The results so far appear very promising, testing on my 4 physcial/8 
hyperthreaded core laptop has yielded the following results using the 
benchmarks added in #8089:
   
   ```
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.groupByQuery                8              0 
                all             75000  avgt    5   294667.810 ±   15159.044  
us/op
   CachingClusteredClientBenchmark.groupByQuery                8              1 
                all             75000  avgt    5   289990.615 ±   12078.017  
us/op
   CachingClusteredClientBenchmark.groupByQuery                8              4 
                all             75000  avgt    5   165992.136 ±    5120.743  
us/op
   CachingClusteredClientBenchmark.groupByQuery                8              0 
             minute             75000  avgt    5   665222.595 ±   34855.138  
us/op
   CachingClusteredClientBenchmark.groupByQuery                8              1 
             minute             75000  avgt    5   601011.456 ±   31143.896  
us/op
   CachingClusteredClientBenchmark.groupByQuery                8              4 
             minute             75000  avgt    5   505897.930 ±  252049.184  
us/op
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.timeseriesQuery             8              0 
                all             75000  avgt    5    10430.144 ±     808.633  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery             8              1 
                all             75000  avgt    5    10451.555 ±     673.152  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery             8              4 
                all             75000  avgt    5     3092.491 ±     102.575  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery             8              1 
             minute             75000  avgt    5    35844.012 ±    1422.124  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery             8              0 
             minute             75000  avgt    5    35463.967 ±    2678.709  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery             8              4 
             minute             75000  avgt    5    10490.847 ±     532.253  
us/op
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.topNQuery                   8              0 
                all             75000  avgt    5    17020.785 ±    1591.048  
us/op
   CachingClusteredClientBenchmark.topNQuery                   8              1 
                all             75000  avgt    5    17236.979 ±    1742.515  
us/op
   CachingClusteredClientBenchmark.topNQuery                   8              4 
                all             75000  avgt    5     5182.325 ±     275.617  
us/op
   CachingClusteredClientBenchmark.topNQuery                   8              0 
             minute             75000  avgt    5   440288.390 ±   36510.682  
us/op
   CachingClusteredClientBenchmark.topNQuery                   8              1 
             minute             75000  avgt    5   462569.201 ±   40711.373  
us/op
   CachingClusteredClientBenchmark.topNQuery                   8              4 
             minute             75000  avgt    5   213399.804 ±   24344.845  
us/op
   
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.groupByQuery               32              0 
                all             75000  avgt    5  1392022.470 ±   37345.957  
us/op
   CachingClusteredClientBenchmark.groupByQuery               32              1 
                all             75000  avgt    5  1355029.496 ±   74562.324  
us/op
   CachingClusteredClientBenchmark.groupByQuery               32              4 
                all             75000  avgt    5   789772.717 ±   15048.352  
us/op
   CachingClusteredClientBenchmark.groupByQuery               32              1 
             minute             75000  avgt    5  3039476.943 ±   96495.702  
us/op
   CachingClusteredClientBenchmark.groupByQuery               32              0 
             minute             75000  avgt    5  3066853.419 ±   83639.422  
us/op
   CachingClusteredClientBenchmark.groupByQuery               32              4 
             minute             75000  avgt    5  1677438.195 ±  263601.667  
us/op
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.timeseriesQuery            32              0 
                all             75000  avgt    5    42082.949 ±    3386.983  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            32              1 
                all             75000  avgt    5    41423.214 ±    3115.364  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            32              4 
                all             75000  avgt    5    14438.213 ±     404.668  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            32              0 
             minute             75000  avgt    5   159062.877 ±   13616.178  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            32              1 
             minute             75000  avgt    5   148730.323 ±   13239.695  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            32              4 
             minute             75000  avgt    5    50602.164 ±    1653.594  
us/op
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.topNQuery                  32              0 
                all             75000  avgt    5    68737.166 ±    5021.642  
us/op
   CachingClusteredClientBenchmark.topNQuery                  32              1 
                all             75000  avgt    5    68601.619 ±    1731.964  
us/op
   CachingClusteredClientBenchmark.topNQuery                  32              4 
                all             75000  avgt    5    23354.736 ±     136.128  
us/op
   CachingClusteredClientBenchmark.topNQuery                  32              0 
             minute             75000  avgt    5  2209146.790 ±   26481.535  
us/op
   CachingClusteredClientBenchmark.topNQuery                  32              1 
             minute             75000  avgt    5  2072088.508 ±   23812.286  
us/op
   CachingClusteredClientBenchmark.topNQuery                  32              4 
             minute             75000  avgt    5   702360.662 ±   13024.822  
us/op
   
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.groupByQuery               64              0 
                all             75000  avgt    5  2982378.361 ±  111827.750  
us/op
   CachingClusteredClientBenchmark.groupByQuery               64              1 
                all             75000  avgt    5  3093978.766 ±  173486.476  
us/op
   CachingClusteredClientBenchmark.groupByQuery               64              4 
                all             75000  avgt    5  1866266.877 ±   72350.259  
us/op
   CachingClusteredClientBenchmark.groupByQuery               64              0 
             minute             75000  avgt    5  6771130.653 ±  389650.338  
us/op
   CachingClusteredClientBenchmark.groupByQuery               64              1 
             minute             75000  avgt    5  6531315.426 ±  350824.451  
us/op
   CachingClusteredClientBenchmark.groupByQuery               64              4 
             minute             75000  avgt    5  3915926.485 ± 1945271.888  
us/op
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.timeseriesQuery            64              0 
                all             75000  avgt    5    83402.353 ±    8059.262  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            64              1 
                all             75000  avgt    5    82654.846 ±    6609.039  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            64              4 
                all             75000  avgt    5    29818.524 ±     531.522  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            64              0 
             minute             75000  avgt    5   319373.950 ±   34796.043  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            64              1 
             minute             75000  avgt    5   313583.213 ±   23267.252  
us/op
   CachingClusteredClientBenchmark.timeseriesQuery            64              4 
             minute             75000  avgt    5   107551.460 ±    2252.579  
us/op
   
   Benchmark                                        (numServers)  (parallelism) 
 (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  
Units
   CachingClusteredClientBenchmark.topNQuery                  64              0 
                all             75000  avgt    5   134204.767 ±    1053.392  
us/op
   CachingClusteredClientBenchmark.topNQuery                  64              1 
                all             75000  avgt    5   140975.988 ±    9803.699  
us/op
   CachingClusteredClientBenchmark.topNQuery                  64              4 
                all             75000  avgt    5    49865.418 ±    1392.572  
us/op
   CachingClusteredClientBenchmark.topNQuery                  64              0 
             minute             75000  avgt    5  4927134.934 ±  526932.265  
us/op
   CachingClusteredClientBenchmark.topNQuery                  64              1 
             minute             75000  avgt    5  4371840.961 ±  422021.155  
us/op
   CachingClusteredClientBenchmark.topNQuery                  64              4 
             minute             75000  avgt    5  1571932.840 ±   11563.777  
us/op
   ```
   
   Paralellism `0` is the existing caching clustered client merge strategy, 
parallelism `1` is doing a serial merge on the fork-join pool, and `4` is using 
3 layer 1 tasks to merge sequences in parallel, which is the limit to the 
number of physical cores my laptop has. In many cases queries are processing 
2-3x faster when done in parallel, as would be expected for the given level of 
parallelism. Even doing serial processing with a single fork-join task is 
competitive with the existing serial approach, so _all_ merges can be done with 
the same approach even when there is not capacity available to run the merge in 
parallel. I will continue to update this proposal as I collect more experiment 
results.
   
   ### Operational impact
   
   No forced operational impact since the feature will be opt-in initially and 
must be defined in the service configuration to be enabled. I think that this 
_could_ result in more predicitable broker resource utilization, but operators 
experimenting with this new feature will need to closely monitor broker query 
performance to ensure that the new feature is producing beneficial results.
   
   ### Test plan
   
   Test plan includes live cluster testing on some small-ish clusters I have 
available, as well as running the benchmarks on a large core count machine to 
simulate larger clusters and round out the benchmarks, to ensure that the 
approach scales correctly. Additionally I plan to test 'overloaded' testing to 
ensure that a busy broker performs no worse within reason than the existing 
merge strategy.
   
   ### Future work (optional)
   
   Beyond the initial PR, I think the most benefit would be focusing on 
interleaving and tuning the level of parallelism, re #8356 and #8357.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to