jihoonson commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r214216033
 
 

 ##########
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##########
 @@ -162,34 +184,75 @@ public CachingClusteredClient(
     return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
-  @Override
-  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
+  private <T> QueryRunner<T> runAndMergeWithTimelineChange(
+      final Query<T> query,
+      final UnaryOperator<TimelineLookup<String, ServerSelector>> 
timelineConverter
+  )
   {
-    return new QueryRunner<T>()
-    {
-      @Override
-      public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, 
Object> responseContext)
-      {
-        return CachingClusteredClient.this.run(
+    final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+    if (mergeBatch.isPresent()) {
+      final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = 
conglomerate.findFactory(query);
+      final QueryToolChest<T, Query<T>> toolChest = 
queryRunnerFactory.getToolchest();
+      return (queryPlus, responseContext) -> {
+        final Stream<? extends Sequence<T>> sequences = run(
             queryPlus,
             responseContext,
-            timeline -> {
-              final VersionedIntervalTimeline<String, ServerSelector> 
timeline2 =
-                  new VersionedIntervalTimeline<>(Ordering.natural());
-              for (SegmentDescriptor spec : specs) {
-                final PartitionHolder<ServerSelector> entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-                if (entry != null) {
-                  final PartitionChunk<ServerSelector> chunk = 
entry.getChunk(spec.getPartitionNumber());
-                  if (chunk != null) {
-                    timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-                  }
-                }
-              }
-              return timeline2;
-            }
+            timelineConverter
+        );
+        return MergeWorkTask.parallelMerge(
+            sequences.parallel(),
+            sequenceStream ->
+                new FluentQueryRunnerBuilder<>(toolChest)
+                    .create(
+                        queryRunnerFactory.mergeRunners(
 
 Review comment:
   Also, I've just noticed that this also breaks the assumption of groupBy v2. 
In groupBy v2, the broker assumes that the intermediate aggregates are always 
sorted by the grouping keys, so that it can perform the merge-sorted 
aggregation. However, calling `QueryRunnerFactory.mergeRunners()` internally 
performs hash-aggregation (or array-based aggregation) and then sort again 
which is inefficient. For groupBy v2, merge-sorted aggregation should be 
performed in parallel. Maybe we need to add a new method to QueryToolChest 
which is different from the merge in historicals and the final merge in brokers.
   
   We've recently had a discussion about this on dev mailing. See 
https://lists.apache.org/thread.html/b4c1cbe0c97e52ae5a137f4315af6a202a24d3034f53ce92c0d30150@%3Cdev.druid.apache.org%3E
 for more details.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to