clintropolis opened a new issue #8577: parallel broker merges URL: https://github.com/apache/incubator-druid/issues/8577 ### Motivation Brokers should be able to merge query result sets in parallel, adaptively/automatically, based on current overall utilization. The "merge/combine" of sequences constitutes the bulk of the real work that Brokers perform. This currently takes place within a single thread from the HTTP thread pool, which while fair-ish, means that we are also potentially under-utilizing additional cores on the server if the majority of queries are blocked waiting for results. Using a divide and conquer approach to perform this combining merge of results in parallel should allow us to often _dramatically_ speed up the time this operation takes, and should also make broker resource utilization more predictable at the same time. ### Proposed changes To achieve this, we will introduce a new opt-in mode to enable parallel merging of results by Druid brokers using a fork-join pool in 'async' mode. This proposal is the result of running with the basic idea captured here https://github.com/apache/incubator-druid/pull/6629#pullrequestreview-181900933, and building on the backs of the good work done in #5913 and #6629, creating a couple of prototype implementations, and performing a large number of experiments. My current approach uses a 2 layer hierachy, where the first layer merges sub-sets of input sequences and produces output to a blocking queue, and a single task second layer that merges input from the blocking queue outputs of the first layer into a single output blocking queue. The level of parallelism for layer 1 will be chosen automatically on current 'merge' pool utilization, and the fork-join tasks will self-tune to perform a limited number of operations per task, before yielding their results and forking a new task to continue the work when the new task is scheduled. In a nod to query vectorization which happens at the segment level for historicals, and more importantly, to minimize the number of blocking operations within fork-join pool tasks, the results from the input sequences will be yielded in small batches, processed in batches, and of course added to the output blocking queues batch at a time. While I haven't yet spent the time to find the ideal small batch size, batching seems to work dramatically better than processing and outputing a result at a time, which in some of my intial experiments was even slower in parallel than the existing serial approach due to high lock contention. A prototype implementation based on the experiments so far (but still missing a few features) is available here: https://github.com/apache/incubator-druid/compare/master...clintropolis:broker-parallel-merge-combine-artisanal-small-batch. The design will be described using the terms from this branch, but I consider everything fair game and willing to change based on discussion in this proposal. #### result merging on the fork-join pool A new class `OrderedResultBatch<T>` will capture this notion of result batches, wrapping a `Queue<T>`, as well as the idea of a 'terminal' object in order to communicate to downstream fork-join tasks that a sequence is completed. To simplify the processing of results without directly dealing with these batches, a cursor pattern to allow easily processing individual results from the batches: ``` class BatchedResultsCursor<T>implements ForkJoinPool.ManagedBlocker, Comparable<BatchedResultsCursor<T>> ``` is also introduced, with implementations for `Yielder<OrderedResultBatch<T>>` and `BlockingQueue<OrderedResultBatch<T>>` to allow using the same types of worker tasks for both layer 1 and layer 2. The yielder cursors and blocking queue cursors operate slightly differently in that the yielder cursors are created 'pre-loaded' by virtue of converting the input sequences into accumulating yielders At the outer level, parallel merging will be exposed to `CachingClusteredClient` through a new sequence type: ``` class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T> ``` which will wrap all of the work done on the fork-join pool in the form of a yielding sequence, to allow easy integration through existing interfaces. The `ParallelMergeCombinginSequence` operates on a `List<Sequence<T>> baseSequences` to merge and combine results given an `Ordering<T> orderingFn` and `BinaryOperator<T> combineFn` to order and combine results from the sequences. The HTTP thread has a 'background' combining sequence that builds a sequence from an iterator over the batched results from the output blocking queue of the layer 2 task. Converting the sequence to a yielder will single `RecursiveAction` to run on the fork join pool: ``` class ParallelMergeCombineAction<T> extends RecursiveAction ``` which is responsible for computing the level of parallelism, convert the list of input sequences into `Yielder<OrderedResultBatch<T>>`, dividing the set of input batch yielders between the chosen level of parallelism, and spawning the layer 1 and layer 2 initialization tasks: ``` class YielderMergeCombineSeedAction<T> extends RecursiveAction ``` and ``` class BlockingQueueMergeCombineSeedAction<T> extends RecursiveAction ``` which serve the purpose to block until the initial batch of results is produced and ready to process. Once the results are available, these tasks will produce `BatchedResultsCursor` for the result batch and then feed that into main worker task of the `ParallelMergeCombinginSequence`: ``` class MergeCombineAction<T> extends RecursiveAction ``` which does the actual merging of results. `MergeCombineAction` accepts a list of `BatchedResultsCursor` in a `PriorityQueue` that uses the `Ordering` function to sort. Results with the same ordering are then combined with the combining function while applicable before being added to an output `OrderedResultBatch` to be pushed to an output blocking queue. `MergeCombineAction` will "yield" after processing _n_ inputs, where _n_ is initially `1024`, and subsequently set by measuring the time it takes to process _n_ inputs and computing the ideal _n_ to run for `10ms`. The new _n_ is used for the next `MergeCombineAction` that is executed, continuing the work of processing the `BatchedResultsCursor` from the `PriorityQueue` until everything is completely drained, where a 'terminal' result batch is added to indicate to downstream processors that the stream is complete. The level of parallelization in the prototype is currently naively chosen by picking the maximum of available processors or remaining fork-join tasks, with a test oriented query context parameter to limit lower than available processors. I say naively because this should probably consider not just the remaning fork-join task slots, but how many queries are currently being processed, to attempt to save additional slots when a small number of queries are dominating the pool, but further experimentation and discussion I think might be required to pick an optimal strategy, as well as investigating the content mentioned in #8357. #### Prioritization The current prototype is lacking _any_ sort of prioritization or gated access to the fork-join pool. While I would like to test this first-come first-serve situation to see if it is 'good enough'I think for the initial PR. However, assuming it is not, I would advocate there being some sort of prioritized blocking mechanism to a fixed number of slots set via configuration, an effective maximum concurrent queries to be merging limit, to block before spawning fork-join tasks and release the slots when the merge is complete. This up front blocker, but run until merge complete model, should Longer term I definitely think it is worth investigating an interleaving strategy, per #8356. I haven't spent a significant amount of time thinking about this yet, but I would naively imagine some sort of custom fork-join pool implementation where the 'execute' method goes through a prioritized queue, so that lower priority queries can be stalled in favor of higher priorities. With the work broken up into small chunks like this, it seems like there could be even more elaborate strategies of constricting and expanding the number of parallel merge tasks based on pool utilization by just regrouping `BatchedResultsCursor`, but this would require much further investigation and testing than I think should be part of the initial effort on this. However, I would very much welcome a discussion on what is acceptable for an initial PR (keeping in mind this behavior will initially be opt-in) ### Rationale This seems like a much more flexible approach to dividing up the work for result merging at the broker level than the previous attempt in #6629, and a bit less invasive than the changes of #5913. The concept itself of the broker performing a magic parallel result merging I don't think is objectionable by anyone, so experimentation was necessary in order to provide the approach viable. The results so far appear very promising, testing on my 4 physcial/8 hyperthreaded core laptop has yielded the following results using the benchmarks added in #8089: ``` Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.groupByQuery 8 0 all 75000 avgt 5 294667.810 ± 15159.044 us/op CachingClusteredClientBenchmark.groupByQuery 8 1 all 75000 avgt 5 289990.615 ± 12078.017 us/op CachingClusteredClientBenchmark.groupByQuery 8 4 all 75000 avgt 5 165992.136 ± 5120.743 us/op CachingClusteredClientBenchmark.groupByQuery 8 0 minute 75000 avgt 5 665222.595 ± 34855.138 us/op CachingClusteredClientBenchmark.groupByQuery 8 1 minute 75000 avgt 5 601011.456 ± 31143.896 us/op CachingClusteredClientBenchmark.groupByQuery 8 4 minute 75000 avgt 5 505897.930 ± 252049.184 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.timeseriesQuery 8 0 all 75000 avgt 5 10430.144 ± 808.633 us/op CachingClusteredClientBenchmark.timeseriesQuery 8 1 all 75000 avgt 5 10451.555 ± 673.152 us/op CachingClusteredClientBenchmark.timeseriesQuery 8 4 all 75000 avgt 5 3092.491 ± 102.575 us/op CachingClusteredClientBenchmark.timeseriesQuery 8 1 minute 75000 avgt 5 35844.012 ± 1422.124 us/op CachingClusteredClientBenchmark.timeseriesQuery 8 0 minute 75000 avgt 5 35463.967 ± 2678.709 us/op CachingClusteredClientBenchmark.timeseriesQuery 8 4 minute 75000 avgt 5 10490.847 ± 532.253 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.topNQuery 8 0 all 75000 avgt 5 17020.785 ± 1591.048 us/op CachingClusteredClientBenchmark.topNQuery 8 1 all 75000 avgt 5 17236.979 ± 1742.515 us/op CachingClusteredClientBenchmark.topNQuery 8 4 all 75000 avgt 5 5182.325 ± 275.617 us/op CachingClusteredClientBenchmark.topNQuery 8 0 minute 75000 avgt 5 440288.390 ± 36510.682 us/op CachingClusteredClientBenchmark.topNQuery 8 1 minute 75000 avgt 5 462569.201 ± 40711.373 us/op CachingClusteredClientBenchmark.topNQuery 8 4 minute 75000 avgt 5 213399.804 ± 24344.845 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.groupByQuery 32 0 all 75000 avgt 5 1392022.470 ± 37345.957 us/op CachingClusteredClientBenchmark.groupByQuery 32 1 all 75000 avgt 5 1355029.496 ± 74562.324 us/op CachingClusteredClientBenchmark.groupByQuery 32 4 all 75000 avgt 5 789772.717 ± 15048.352 us/op CachingClusteredClientBenchmark.groupByQuery 32 1 minute 75000 avgt 5 3039476.943 ± 96495.702 us/op CachingClusteredClientBenchmark.groupByQuery 32 0 minute 75000 avgt 5 3066853.419 ± 83639.422 us/op CachingClusteredClientBenchmark.groupByQuery 32 4 minute 75000 avgt 5 1677438.195 ± 263601.667 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.timeseriesQuery 32 0 all 75000 avgt 5 42082.949 ± 3386.983 us/op CachingClusteredClientBenchmark.timeseriesQuery 32 1 all 75000 avgt 5 41423.214 ± 3115.364 us/op CachingClusteredClientBenchmark.timeseriesQuery 32 4 all 75000 avgt 5 14438.213 ± 404.668 us/op CachingClusteredClientBenchmark.timeseriesQuery 32 0 minute 75000 avgt 5 159062.877 ± 13616.178 us/op CachingClusteredClientBenchmark.timeseriesQuery 32 1 minute 75000 avgt 5 148730.323 ± 13239.695 us/op CachingClusteredClientBenchmark.timeseriesQuery 32 4 minute 75000 avgt 5 50602.164 ± 1653.594 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.topNQuery 32 0 all 75000 avgt 5 68737.166 ± 5021.642 us/op CachingClusteredClientBenchmark.topNQuery 32 1 all 75000 avgt 5 68601.619 ± 1731.964 us/op CachingClusteredClientBenchmark.topNQuery 32 4 all 75000 avgt 5 23354.736 ± 136.128 us/op CachingClusteredClientBenchmark.topNQuery 32 0 minute 75000 avgt 5 2209146.790 ± 26481.535 us/op CachingClusteredClientBenchmark.topNQuery 32 1 minute 75000 avgt 5 2072088.508 ± 23812.286 us/op CachingClusteredClientBenchmark.topNQuery 32 4 minute 75000 avgt 5 702360.662 ± 13024.822 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.groupByQuery 64 0 all 75000 avgt 5 2982378.361 ± 111827.750 us/op CachingClusteredClientBenchmark.groupByQuery 64 1 all 75000 avgt 5 3093978.766 ± 173486.476 us/op CachingClusteredClientBenchmark.groupByQuery 64 4 all 75000 avgt 5 1866266.877 ± 72350.259 us/op CachingClusteredClientBenchmark.groupByQuery 64 0 minute 75000 avgt 5 6771130.653 ± 389650.338 us/op CachingClusteredClientBenchmark.groupByQuery 64 1 minute 75000 avgt 5 6531315.426 ± 350824.451 us/op CachingClusteredClientBenchmark.groupByQuery 64 4 minute 75000 avgt 5 3915926.485 ± 1945271.888 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.timeseriesQuery 64 0 all 75000 avgt 5 83402.353 ± 8059.262 us/op CachingClusteredClientBenchmark.timeseriesQuery 64 1 all 75000 avgt 5 82654.846 ± 6609.039 us/op CachingClusteredClientBenchmark.timeseriesQuery 64 4 all 75000 avgt 5 29818.524 ± 531.522 us/op CachingClusteredClientBenchmark.timeseriesQuery 64 0 minute 75000 avgt 5 319373.950 ± 34796.043 us/op CachingClusteredClientBenchmark.timeseriesQuery 64 1 minute 75000 avgt 5 313583.213 ± 23267.252 us/op CachingClusteredClientBenchmark.timeseriesQuery 64 4 minute 75000 avgt 5 107551.460 ± 2252.579 us/op Benchmark (numServers) (parallelism) (queryGranularity) (rowsPerSegment) Mode Cnt Score Error Units CachingClusteredClientBenchmark.topNQuery 64 0 all 75000 avgt 5 134204.767 ± 1053.392 us/op CachingClusteredClientBenchmark.topNQuery 64 1 all 75000 avgt 5 140975.988 ± 9803.699 us/op CachingClusteredClientBenchmark.topNQuery 64 4 all 75000 avgt 5 49865.418 ± 1392.572 us/op CachingClusteredClientBenchmark.topNQuery 64 0 minute 75000 avgt 5 4927134.934 ± 526932.265 us/op CachingClusteredClientBenchmark.topNQuery 64 1 minute 75000 avgt 5 4371840.961 ± 422021.155 us/op CachingClusteredClientBenchmark.topNQuery 64 4 minute 75000 avgt 5 1571932.840 ± 11563.777 us/op ``` Paralellism `0` is the existing caching clustered client merge strategy, parallelism `1` is doing a serial merge on the fork-join pool, and `4` is using 3 layer 1 tasks to merge sequences in parallel, which is the limit to the number of physical cores my laptop has. In many cases queries are processing 2-3x faster when done in parallel, as would be expected for the given level of parallelism. Even doing serial processing with a single fork-join task is competitive with the existing serial approach, so _all_ merges can be done with the same approach even when there is not capacity available to run the merge in parallel. I will continue to update this proposal as I collect more experiment results. ### Operational impact No forced operational impact since the feature will be opt-in initially and must be defined in the service configuration to be enabled. I think that this _could_ result in more predicitable broker resource utilization, but operators experimenting with this new feature will need to closely monitor broker query performance to ensure that the new feature is producing beneficial results. ### Test plan Test plan includes live cluster testing on some small-ish clusters I have available, as well as running the benchmarks on a large core count machine to simulate larger clusters and round out the benchmarks, to ensure that the approach scales correctly. Additionally I plan to test 'overloaded' testing to ensure that a busy broker performs no worse within reason than the existing merge strategy. ### Future work (optional) Beyond the initial PR, I think the most benefit would be focusing on interleaving and tuning the level of parallelism, re #8356 and #8357.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
