Hi Jisoo, no worries. Questions are always welcome.
> How does queryToolChest.mergeResults() assumes the input is sorted by grouping keys? Seems like it's using ResultMergeQueryRunner with query's row ordering within the method, and queryToolChest.mergeResults() gets called in ServerManager as well where it uses queryRunnerFactory.mergeRunners(). The difference between ServerManager and ClientQuerySegmentWalker/CachingClusteredClient seems to be that CachingClusteredClient returns MergeSequence with query.getResultOrdering(), which I think uses rowOrdering. Is the use of MergeSequence the one that's making the difference? In that case, is it ok for ServerManager to call queryToolChest.mergeResults() on top of queryRunnerFactory.mergeRunners()? I am curious about what differentiates the Druid broker from historicals when it comes to merging the sub query results. The thing is, GroupByMergingQueryRunnerV2.run(), which is called in GroupByQueryRunnerFactory.mergeRunners(), always returns the sorted result (please check RowBasedGrouperHelper.makeGrouperIterator()). So, the result from each historical is already sorted. If you call queryRunnerFactory.mergeRunners() on these sorted results, it first performs hash aggregation and then sorts them again. queryToolChest.mergeResults(queryRunnerFactory.mergeRunners(exec, queryRunners)) in ServerManager should be fine. It's just merging all results from each segments returning it to the broker. MergeSequence is just used to merge several sequences into a single one and iterates items in those sequences in order. So, the items in each sequence must be sorted before consuming by MergeSequence. CombiningSequence consumes the items from MergeSequence and performs sorted-merge aggregation (in ResultMergeQueryRunner). > Could you please point me to the code for this? I'm not sure what code you mean, but here are somethings you might be interested in. - GroupByQueryEngineV2.process() is called to process each segment. This uses hash aggregation by default (BufferHashGrouper). It can use array-based aggregation (BufferArrayGrouper) if the number of grouping keys is 1. - GroupByMergingQueryRunnerV2.run() can call GroupByQueryEngineV2.process() concurrently per segment. ConcurrentGrouper is used to allow concurrent aggregation. Once the aggregation completes, GroupByMergingQueryRunnerV2.run() sorts and returns the result (RowBasedGrouperHelper.makeGrouperIterator()). The sort can also be done in parallel (ConcurrentGrouper.iterator()). ParallelCombiner can be optionally used for combining sorted results from each segment. Hope this helps. Best, Jihoon On Wed, Jul 25, 2018 at 4:25 PM Jisoo Kim <[email protected]> wrote: > Hi Jihoon, > > Thanks for the explanation, but I think I need some more clarification on > the reason not to use queryRunnerFactory.mergeRunners(). > > > In v2, the broker assumes that intermediate aggregates from historicals > are > always sorted by grouping keys. This enables merge-sorted aggregation on > brokers which is much more efficient than hash aggregation in terms of > speed as well as memory usage. However, queryRunnerFactory.mergeRunners() > works based on hash aggregation. This would cause the same issue of groupBy > v1 which requires the full materialization of intermediates on brokers > (either in memory or on disk). > > Could you please point me to the code for this? > > > Also, this requires to sort the result > of queryRunnerFactory.mergeRunners() before > calling queryToolChest.mergeResults() since it assumes the input is sorted > by grouping keys. > > How does queryToolChest.mergeResults() assumes the input is sorted by > grouping keys? Seems like it's using ResultMergeQueryRunner with query's > row ordering within the method, and queryToolChest.mergeResults() gets > called in ServerManager as well where it uses > queryRunnerFactory.mergeRunners(). The difference between ServerManager and > ClientQuerySegmentWalker/CachingClusteredClient seems to be that > CachingClusteredClient returns MergeSequence with > query.getResultOrdering(), which I think uses rowOrdering. Is the use of > MergeSequence the one that's making the difference? In that case, is it ok > for ServerManager to call queryToolChest.mergeResults() on top of > queryRunnerFactory.mergeRunners()? I am curious about what differentiates > the Druid broker from historicals when it comes to merging the sub query > results. > > Sorry for many questions and thanks, > Jisoo > > On Sat, Jul 21, 2018 at 7:28 PM, Jihoon Son <[email protected]> wrote: > > > Hi Jisoo, > > > > I think it would work, but there is currently at least one reason to not > > use queryRunnerFactory.mergeRunners() in groupBy v2. > > > > In v2, the broker assumes that intermediate aggregates from historicals > are > > always sorted by grouping keys. This enables merge-sorted aggregation on > > brokers which is much more efficient than hash aggregation in terms of > > speed as well as memory usage. However, queryRunnerFactory.mergeRunners() > > works based on hash aggregation. This would cause the same issue of > groupBy > > v1 which requires the full materialization of intermediates on brokers > > (either in memory or on disk). Also, this requires to sort the result > > of queryRunnerFactory.mergeRunners() before > > calling queryToolChest.mergeResults() since it assumes the input is > sorted > > by grouping keys. > > > > > I am wondering if the improvement that I gained was from changing logic > > was mainly from deserializing the sub-query results in parallel (by > calling > > queryRunnerFactory.mergeRunners() which seems to enable parallelism), or > > if > > it was also benefitting from using GroupByMergingQueryRunnerV2 that has > > parallel combining threads enabled. > > > > I assume you enabled parallel combining in GroupByMergingQueryRunnerV2 > > (it's disabled by default). Then, it's difficult to tell where you gained > > the benefit. You might ned to run more benchmarks to figure out. > > > > Best, > > Jihoon > > > > On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim <[email protected]> > > wrote: > > > > > Hi Jihoon, > > > > > > Thanks for the reply. So what I ended up doing for merging a list of > > > serialized result Sequences (which is a byte array) was: > > > > > > 1) Create a stream of out of the list > > > 2) For each serialized sequence in a list, create a query runner that > > > deserializes the byte array and returns a Sequence (along with applying > > > PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner> > > > 3) Call queryRunnerFactory.mergeRunners() (factory is created from the > > > injector and given query) on the materialized list of QueryRunner > > > 4) Create a FluentQueryRunner out of 3) and add necessary steps > including > > > mergeResults(), which essentially calls queryToolChest.mergeResults() > on > > > queryRunnerFactory.mergeRunners() > > > > > > Does my approach look valid or is it something that I shouldn't be > doing > > > for merging query results? Before I changed the merging logic to the > > above > > > I encountered a problem with merging sub-query results properly for > very > > > heavy groupBy queries. > > > > > > I haven't had much chance to read through all the group by query > > processing > > > logic, but I am wondering if the improvement that I gained was from > > > changing logic was mainly from deserializing the sub-query results in > > > parallel (by calling queryRunnerFactory.mergeRunners() which seems to > > > enable parallelism), or if it was also benefitting from using > > > GroupByMergingQueryRunnerV2 that has parallel combining threads > enabled. > > > > > > Thanks, > > > Jisoo > > > > > > On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <[email protected]> > wrote: > > > > > > > Hi Jisoo, > > > > > > > > sorry, the previous email was sent by accident. > > > > > > > > The initial version of groupBy v2 wasn't capable of combining > > > intermediates > > > > in parallel. Some of our customers met the similar issue to yours, > and > > > so I > > > > was working on improving groupBy v2 performance for a while. > > > > > > > > Parallel combining on brokers definitely makes sense. I was thinking > to > > > add > > > > a sort of ParallelMergeSequence which is a parallel version of > > > > MergeSequence, but it can be anything if it supports parallel > combining > > > on > > > > brokers. > > > > > > > > One thing I'm worrying about is, most query processing interfaces in > > > > brokers are using Sequence, and thus using another stuff for a > specific > > > > query type might make the codes complicated. I think we need to avoid > > it > > > if > > > > possible. > > > > > > > > Best, > > > > Jihoon > > > > > > > > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <[email protected]> > wrote: > > > > > > > > > Hi Jisoo, > > > > > > > > > > the initial version of groupBy v2 > > > > > > > > > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim > <[email protected] > > > > > > > > wrote: > > > > > > > > > >> Hi all, > > > > >> > > > > >> I am currently working on a project that uses Druid's QueryRunner > > and > > > > >> other > > > > >> druid-processing classes. It uses Druid's own classes to calculate > > > query > > > > >> results. I have been testing large GroupBy queries (using v2), and > > it > > > > >> seems > > > > >> like parallel combining threads for GroupBy queries are only > enabled > > > on > > > > >> the > > > > >> historical level. I think it is only getting called by > > > > >> GroupByStrategyV2.mergeRunners() > > > > >> < > > > > >> https://github.com/apache/incubator-druid/blob/druid-0. > > > > 12.1/processing/src/main/java/io/druid/query/groupby/ > > > > strategy/GroupByStrategyV2.java#L335 > > > > >> > > > > > >> which is only called by GroupByQueryRunnerFactory.mergeRunners() > on > > > > >> historicals. > > > > >> > > > > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads > meant > > > for > > > > >> computing and merging per-segment results only, or can they also > be > > > used > > > > >> on > > > > >> the broker level? I changed the logic of my project from calling > > > > >> queryToolChest.mergeResults() on MergeSequence (created by > > providing a > > > > >> list > > > > >> of per-segment/per-server sequences) to calling > > > > >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() > > > > (where > > > > >> each runner returns a deserialized result sequence), and that > seemed > > > to > > > > >> have reduced really heavy groupby query computation time or > failures > > > by > > > > >> quite a lot. Or is this just a coincidence and there shouldn't be > a > > > > >> performance difference in merging groupby query results, and the > > only > > > > >> difference could've been by parallelizing the deserialization of > > > result > > > > >> sequences from sub-queries? > > > > >> > > > > >> Thanks, > > > > >> Jisoo > > > > >> > > > > > > > > > > > > > > >
