drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r215460635
 
 

 ##########
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##########
 @@ -162,34 +184,75 @@ public CachingClusteredClient(
     return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
-  @Override
-  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
+  private <T> QueryRunner<T> runAndMergeWithTimelineChange(
+      final Query<T> query,
+      final UnaryOperator<TimelineLookup<String, ServerSelector>> 
timelineConverter
+  )
   {
-    return new QueryRunner<T>()
-    {
-      @Override
-      public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, 
Object> responseContext)
-      {
-        return CachingClusteredClient.this.run(
+    final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+
+    if (mergeBatch.isPresent()) {
+      final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = 
conglomerate.findFactory(query);
+      final QueryToolChest<T, Query<T>> toolChest = 
queryRunnerFactory.getToolchest();
+      return (queryPlus, responseContext) -> {
+        final Stream<? extends Sequence<T>> sequences = run(
             queryPlus,
             responseContext,
-            timeline -> {
-              final VersionedIntervalTimeline<String, ServerSelector> 
timeline2 =
-                  new VersionedIntervalTimeline<>(Ordering.natural());
-              for (SegmentDescriptor spec : specs) {
-                final PartitionHolder<ServerSelector> entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-                if (entry != null) {
-                  final PartitionChunk<ServerSelector> chunk = 
entry.getChunk(spec.getPartitionNumber());
-                  if (chunk != null) {
-                    timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-                  }
-                }
-              }
-              return timeline2;
-            }
+            timelineConverter
+        );
+        return MergeWorkTask.parallelMerge(
+            sequences.parallel(),
+            sequenceStream ->
+                new FluentQueryRunnerBuilder<>(toolChest)
+                    .create(
+                        queryRunnerFactory.mergeRunners(
 
 Review comment:
   I'll comment more later but as a brief addressing of the last points:
   
   There wasn't as much of an effect on the memory usage as I expected once it 
got into the wild here. I think this is partly due to the fact that the merges 
get threaded in as soon as they are ready from the nodes, and if your skew on 
query times across your nodes is comparable to the parallel merging ability, 
then there isn't much accumulated on the brokers. So if a broker can merge 100 
nodes in 0.5 seconds for example, and the variance in node response time on 
your historicals swings between 0.25s and 0.75s, then by the time the 0.25s 
ones have been merged in, the 0.75s ones start to stack up to be merged. I 
don't have anything to prove this hypothesis.
   
   Starvation of the mergeFJP is a very real concern. That is why the 
`ForkJoinPool.managedBlock` is needed. In such a scenario the FJP detects that 
too many threads are blocked and adjusts its thread pool appropriately. I don't 
have strong examples of group-by queries on brokers using this approach, so I 
haven't exercised that code path yet, but the topN code path seems to be 
working as expected with this approach without encountering thread starvation.
   
   I'll definitely check out the `ChainedExecutionQueryRunner` approach

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to