drcrallen commented on a change in pull request #5913: Move Caching Cluster 
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213809885
 
 

 ##########
 File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
 ##########
 @@ -169,34 +186,64 @@ public CachingClusteredClient(
     return new SpecificQueryRunnable<>(queryPlus, 
responseContext).run(timelineConverter);
   }
 
+  private <T> Sequence<T> runAndMergeWithTimelineChange(
+      final Query<T> query,
+      final QueryPlus<T> queryPlus,
+      final Map<String, Object> responseContext,
+      final UnaryOperator<TimelineLookup<String, ServerSelector>> 
timelineConverter
+  )
+  {
+    final Stream<? extends Sequence<T>> sequences = run(
+        queryPlus,
+        responseContext,
+        timelineConverter
+    );
+    final OptionalLong mergeBatch = 
QueryContexts.getIntermediateMergeBatchThreshold(query);
+    if (mergeBatch.isPresent()) {
+      return MergeWorkTask.parallelMerge(
+          query.getResultOrdering(),
+          sequences.parallel(),
+          mergeBatch.getAsLong(),
+          mergeFjp
+      );
+    } else {
+      return new MergeSequence<>(
+          query.getResultOrdering(),
+          Sequences.simple(sequences)
+      );
+    }
+  }
+
   @Override
   public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
   {
-    return new QueryRunner<T>()
-    {
-      @Override
-      public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, 
Object> responseContext)
-      {
-        return CachingClusteredClient.this.run(
-            queryPlus,
-            responseContext,
-            timeline -> {
-              final VersionedIntervalTimeline<String, ServerSelector> 
timeline2 =
-                  new VersionedIntervalTimeline<>(Ordering.natural());
-              for (SegmentDescriptor spec : specs) {
-                final PartitionHolder<ServerSelector> entry = 
timeline.findEntry(spec.getInterval(), spec.getVersion());
-                if (entry != null) {
-                  final PartitionChunk<ServerSelector> chunk = 
entry.getChunk(spec.getPartitionNumber());
-                  if (chunk != null) {
-                    timeline2.add(spec.getInterval(), spec.getVersion(), 
chunk);
-                  }
-                }
+    return (queryPlus, responseContext) -> runAndMergeWithTimelineChange(
+        query,
+        queryPlus,
+        responseContext,
+        timeline -> {
+          final VersionedIntervalTimeline<String, ServerSelector> timeline2 =
+              new VersionedIntervalTimeline<>(Ordering.natural());
+          for (SegmentDescriptor spec : specs) {
+            final PartitionHolder<ServerSelector> entry = timeline.findEntry(
+                spec.getInterval(),
+                spec.getVersion()
+            );
+            if (entry != null) {
+              final PartitionChunk<ServerSelector> chunk = entry.getChunk(
+                  spec.getPartitionNumber());
+              if (chunk != null) {
+                timeline2.add(
+                    spec.getInterval(),
 
 Review comment:
   this has been fixed. Personally, I find a bunch of long lines harder to 
read. I've attempted to strike a reasonable balance in this class

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to