drcrallen commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213757238
########## File path: server/src/main/java/io/druid/client/CachingClusteredClient.java ########## @@ -242,74 +305,93 @@ public CachingClusteredClient( contextBuilder.put(CacheConfig.POPULATE_CACHE, false); contextBuilder.put("bySegment", true); } - return contextBuilder.build(); + return Collections.unmodifiableMap(contextBuilder); } - Sequence<T> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter) + Stream<Sequence<T>> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter) { @Nullable TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { - return Sequences.empty(); + return Stream.empty(); } timeline = timelineConverter.apply(timeline); if (uncoveredIntervalsLimit > 0) { computeUncoveredIntervals(timeline); } - final Set<ServerToSegment> segments = computeSegmentsToQuery(timeline); + Stream<ServerToSegment> segments = computeSegmentsToQuery(timeline); @Nullable final byte[] queryCacheKey = computeQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { + // Materialize then re-stream + final List<ServerToSegment> materializedSegments = segments.collect(Collectors.toList()); + segments = materializedSegments.stream(); + @Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); @Nullable - final String currentEtag = computeCurrentEtag(segments, queryCacheKey); + final String currentEtag = computeCurrentEtag(materializedSegments, queryCacheKey); if (currentEtag != null && currentEtag.equals(prevEtag)) { - return Sequences.empty(); + return Stream.empty(); } } - final List<Pair<Interval, byte[]>> alreadyCachedResults = pruneSegmentsWithCachedResults(queryCacheKey, segments); - final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segments); - return new LazySequence<>(() -> { - List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); - addSequencesFromCache(sequencesByInterval, alreadyCachedResults); - addSequencesFromServer(sequencesByInterval, segmentsByServer); - return Sequences - .simple(sequencesByInterval) - .flatMerge(seq -> seq, query.getResultOrdering()); - }); + // This pipeline follows a few general steps: + // 1. Fetch cache results - Unfortunately this is an eager operation so that the non cached items can Review comment: the problem here is that in our cluster we have enough nodes to where it is reasonable for a small datasource to have one segment or less per historical node. In such a scenario there will be a large quantity of cache requests (one per server) that would have been better to batch at the beginning. Basically I expect an increase in load on the cache system due to lack of ability to batch fetch cache results if such an approach were taken. That is a significant change in workflow compared to the implementation in `/master` where the cached results are fetched in bulk first, with a limit on the qty of results that can be cached at broker per call. As a bit of context: by allowing a limit on the qty of results per batch call at the broker level, it allows us to not even try to fetch, say, 1M results if we know our cache system can only probably return 10k results in the timeout limit given. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org