> There wasn't as much of an effect on the memory usage as I expected once it
> got into the wild here. I think this is partly due to the fact that the
> merges get threaded in as soon as they are ready from the nodes, and if your
> skew on query times across your nodes is comparable to the parallel merging
> ability, then there isn't much accumulated on the brokers.
@drcrallen Am I misunderstanding what the code is doing? As I read it, the full
results from each particular server are materialized before merging begins. (As
opposed to the old broker code, which would start merging results as soon as
they appear from each server.) I am talking specifically about this part:
```
return MergeWorkTask.parallelMerge(
sequences.parallel(),
sequenceStream ->
new FluentQueryRunnerBuilder<>(toolChest)
.create(
queryRunnerFactory.mergeRunners(
mergeFjp,
sequenceStream.map(
s -> (QueryRunner<T>) (ignored0, ignored1) ->
(Sequence<T>) s
).collect(
Collectors.toList()
)
)
)
.mergeResults()
.run(queryPlus, responseContext),
mergeBatch.getAsLong(),
mergeFjp
);
```
In particular: `queryRunnerFactory.mergeRunners` for timeseries/topN is using
ChainedExecutionQueryRunner, which runs `runner.run().toList()` in the
`mergeFjp`. Each runner.run() returns a Sequence corresponding to the results
from a single server, so, that would need to `toList()` over all results for
that server before any of them can be considered for merging. Based on my
understanding of how this works, I would expect to see FJP threads blocking on
getting results from historicals.
[ Full content available at:
https://github.com/apache/incubator-druid/pull/5913 ]
This message was relayed via gitbox.apache.org for [email protected]