> hmmm, sounds like GroupBy needs some more love to potentially support this 
> feature. 

@drcrallen It sounds more to me like the groupBy issues are a symptom of using 
the wrong API here: the new CachingClusteredClient uses 
`mergeRunners(ExecutorService, Iterable<QueryRunner<T>>)` when it probably 
shouldn't. The javadocs for that method say that it should be called on the 
runners from `createRunner(Segment)`:

```
   * Runners generated with createRunner() and combined into an Iterable in 
(time,shardId) order are passed
   * along to this method with an ExecutorService.  The method should then 
return a QueryRunner that, when
   * asked, will use the ExecutorService to run the base QueryRunners in some 
fashion.
```

There's no guarantee in general that this method will do the right thing if 
applied to higher level runners.

I could suggest a simple fix that will make things work for groupBy (or any 
other query type): replace the mergeRunners() call with an explicit creation of 
ChainedExecutionQueryRunner that uses the mergeFjp. This is already what 
mergeRunners does for timeseries and topN, so they'll work the same way. For 
query types that do something more 'special' for mergeRunners, this should 
still work: it will basically materialize results for each server, then merge 
them using mergeResults (which knows how to merge those by-server results).

I also have a couple of general questions. If I read it right, this patch is 
materializing results for each server inside a mergeFjp thread before starting 
to merge them. That has a couple of possible ill effects:

1. Increased memory usage, especially for queries that might generate a lot of 
results per server (i.e. scan, groupBy, topN with high threshold, anything 
that's by segment).
2. Starvation of mergeFjp threads if a few queries take too long to materialize 
results from their servers, and their materialization tasks are sitting around 
blocking inside the FJP. Before this patch, a small number of queries could not 
starve out any of the broker thread pools (although, a large number of slow 
queries could starve out the broker's http serving thread pool).

I'm wondering if it makes more sense to keep using a single thread to retrieve 
and deserialize results from historicals, but then merge them in a 
multithreaded pool, such that we know the merging work won't block? Or, is the 
deserialization of retrieved results one of the bottlenecks we're trying to 
avoid? In that case, what about multithreading deserialization too, but still 
having the retrieval be single-threaded?

Could you talk more about your experience running this in prod, & if you've 
noticed anything like these effects?

[ Full content available at: 
https://github.com/apache/incubator-druid/pull/5913 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to