drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213809885
########## File path: server/src/main/java/io/druid/client/CachingClusteredClient.java ########## @@ -169,34 +186,64 @@ public CachingClusteredClient( return new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter); } + private <T> Sequence<T> runAndMergeWithTimelineChange( + final Query<T> query, + final QueryPlus<T> queryPlus, + final Map<String, Object> responseContext, + final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter + ) + { + final Stream<? extends Sequence<T>> sequences = run( + queryPlus, + responseContext, + timelineConverter + ); + final OptionalLong mergeBatch = QueryContexts.getIntermediateMergeBatchThreshold(query); + if (mergeBatch.isPresent()) { + return MergeWorkTask.parallelMerge( + query.getResultOrdering(), + sequences.parallel(), + mergeBatch.getAsLong(), + mergeFjp + ); + } else { + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple(sequences) + ); + } + } + @Override public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs) { - return new QueryRunner<T>() - { - @Override - public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext) - { - return CachingClusteredClient.this.run( - queryPlus, - responseContext, - timeline -> { - final VersionedIntervalTimeline<String, ServerSelector> timeline2 = - new VersionedIntervalTimeline<>(Ordering.natural()); - for (SegmentDescriptor spec : specs) { - final PartitionHolder<ServerSelector> entry = timeline.findEntry(spec.getInterval(), spec.getVersion()); - if (entry != null) { - final PartitionChunk<ServerSelector> chunk = entry.getChunk(spec.getPartitionNumber()); - if (chunk != null) { - timeline2.add(spec.getInterval(), spec.getVersion(), chunk); - } - } + return (queryPlus, responseContext) -> runAndMergeWithTimelineChange( + query, + queryPlus, + responseContext, + timeline -> { + final VersionedIntervalTimeline<String, ServerSelector> timeline2 = + new VersionedIntervalTimeline<>(Ordering.natural()); + for (SegmentDescriptor spec : specs) { + final PartitionHolder<ServerSelector> entry = timeline.findEntry( + spec.getInterval(), + spec.getVersion() + ); + if (entry != null) { + final PartitionChunk<ServerSelector> chunk = entry.getChunk( + spec.getPartitionNumber()); + if (chunk != null) { + timeline2.add( + spec.getInterval(), Review comment: this has been fixed. Personally, I find a bunch of long lines harder to read. I've attempted to strike a reasonable balance in this class ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org