jihoonson commented on a change in pull request #5913: Move Caching Cluster
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r214216033
##########
File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
##########
@@ -162,34 +184,75 @@ public CachingClusteredClient(
return new SpecificQueryRunnable<>(queryPlus,
responseContext).run(timelineConverter);
}
- @Override
- public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query,
final Iterable<SegmentDescriptor> specs)
+ private <T> QueryRunner<T> runAndMergeWithTimelineChange(
+ final Query<T> query,
+ final UnaryOperator<TimelineLookup<String, ServerSelector>>
timelineConverter
+ )
{
- return new QueryRunner<T>()
- {
- @Override
- public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String,
Object> responseContext)
- {
- return CachingClusteredClient.this.run(
+ final OptionalLong mergeBatch =
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+ if (mergeBatch.isPresent()) {
+ final QueryRunnerFactory<T, Query<T>> queryRunnerFactory =
conglomerate.findFactory(query);
+ final QueryToolChest<T, Query<T>> toolChest =
queryRunnerFactory.getToolchest();
+ return (queryPlus, responseContext) -> {
+ final Stream<? extends Sequence<T>> sequences = run(
queryPlus,
responseContext,
- timeline -> {
- final VersionedIntervalTimeline<String, ServerSelector>
timeline2 =
- new VersionedIntervalTimeline<>(Ordering.natural());
- for (SegmentDescriptor spec : specs) {
- final PartitionHolder<ServerSelector> entry =
timeline.findEntry(spec.getInterval(), spec.getVersion());
- if (entry != null) {
- final PartitionChunk<ServerSelector> chunk =
entry.getChunk(spec.getPartitionNumber());
- if (chunk != null) {
- timeline2.add(spec.getInterval(), spec.getVersion(),
chunk);
- }
- }
- }
- return timeline2;
- }
+ timelineConverter
+ );
+ return MergeWorkTask.parallelMerge(
+ sequences.parallel(),
+ sequenceStream ->
+ new FluentQueryRunnerBuilder<>(toolChest)
+ .create(
+ queryRunnerFactory.mergeRunners(
Review comment:
Also, I've just noticed that this also breaks the assumption of groupBy v2.
In groupBy v2, the broker assumes that the intermediate aggregates are always
sorted by the grouping keys, so that it can perform the merge-sorted
aggregation. However, calling `QueryRunnerFactory.mergeRunners()` internally
performs hash-aggregation (or array-based aggregation) and then sort again
which is inefficient. For groupBy v2, merge-sorted aggregation should be
performed in parallel. Maybe we need to add a new method to QueryToolChest
which is different from the merge in historicals and the final merge in brokers.
We've recently had a discussion about this on dev mailing. See
https://lists.apache.org/thread.html/b4c1cbe0c97e52ae5a137f4315af6a202a24d3034f53ce92c0d30150@%3Cdev.druid.apache.org%3E
for more details.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]