drcrallen commented on a change in pull request #5913: Move Caching Cluster
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213757238
##########
File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
##########
@@ -242,74 +305,93 @@ public CachingClusteredClient(
contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", true);
}
- return contextBuilder.build();
+ return Collections.unmodifiableMap(contextBuilder);
}
- Sequence<T> run(final UnaryOperator<TimelineLookup<String,
ServerSelector>> timelineConverter)
+ Stream<Sequence<T>> run(final UnaryOperator<TimelineLookup<String,
ServerSelector>> timelineConverter)
{
@Nullable
TimelineLookup<String, ServerSelector> timeline =
serverView.getTimeline(query.getDataSource());
if (timeline == null) {
- return Sequences.empty();
+ return Stream.empty();
}
timeline = timelineConverter.apply(timeline);
if (uncoveredIntervalsLimit > 0) {
computeUncoveredIntervals(timeline);
}
- final Set<ServerToSegment> segments = computeSegmentsToQuery(timeline);
+ Stream<ServerToSegment> segments = computeSegmentsToQuery(timeline);
@Nullable
final byte[] queryCacheKey = computeQueryCacheKey();
if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
+ // Materialize then re-stream
+ final List<ServerToSegment> materializedSegments =
segments.collect(Collectors.toList());
+ segments = materializedSegments.stream();
+
@Nullable
final String prevEtag = (String)
query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
@Nullable
- final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
+ final String currentEtag = computeCurrentEtag(materializedSegments,
queryCacheKey);
if (currentEtag != null && currentEtag.equals(prevEtag)) {
- return Sequences.empty();
+ return Stream.empty();
}
}
- final List<Pair<Interval, byte[]>> alreadyCachedResults =
pruneSegmentsWithCachedResults(queryCacheKey, segments);
- final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer =
groupSegmentsByServer(segments);
- return new LazySequence<>(() -> {
- List<Sequence<T>> sequencesByInterval = new
ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
- addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
- addSequencesFromServer(sequencesByInterval, segmentsByServer);
- return Sequences
- .simple(sequencesByInterval)
- .flatMerge(seq -> seq, query.getResultOrdering());
- });
+ // This pipeline follows a few general steps:
+ // 1. Fetch cache results - Unfortunately this is an eager operation so
that the non cached items can
Review comment:
the problem here is that in our cluster we have enough nodes to where it is
reasonable for a small datasource to have one segment or less per historical
node. In such a scenario there will be a large quantity of cache requests (one
per server) that would have been better to batch at the beginning.
Basically I expect an increase in load on the cache system due to lack of
ability to batch fetch cache results if such an approach were taken. That is a
significant change in workflow compared to the implementation in `/master`
where the cached results are fetched in bulk first, with a limit on the qty of
results that can be cached at broker per call.
As a bit of context: by allowing a limit on the qty of results per batch
call at the broker level, it allows us to not even try to fetch, say, 1M
results if we know our cache system can only probably return 10k results in the
timeout limit given.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]