leventov commented on a change in pull request #5913: Move Caching Cluster Client to java streams and allow parallel intermediate merges URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213871698
########## File path: server/src/main/java/io/druid/client/CachingClusteredClient.java ########## @@ -389,169 +461,248 @@ private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable } } - private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults( + private Pair<ServerToSegment, Optional<byte[]>> lookupInCache( + Pair<ServerToSegment, Cache.NamedKey> key, + Map<Cache.NamedKey, Optional<byte[]>> cache + ) + { + final ServerToSegment segment = key.getLhs(); + final Cache.NamedKey segmentCacheKey = key.getRhs(); + final Interval segmentQueryInterval = segment.getSegmentDescriptor().getInterval(); + final Optional<byte[]> cachedValue = Optional + .ofNullable(cache.get(segmentCacheKey)) + // Shouldn't happen in practice, but can screw up unit tests where cache state is mutated in crazy + // ways when the cache returns null instead of an optional. + .orElse(Optional.empty()); + if (!cachedValue.isPresent()) { + // if populating cache, add segment to list of segments to cache if it is not cached + final String segmentIdentifier = segment.getServer().getSegment().getIdentifier(); + addCachePopulatorKey(segmentCacheKey, segmentIdentifier, segmentQueryInterval); + } + return Pair.of(segment, cachedValue); + } + + /** + * This materializes the input segment stream in order to let the BulkGet stuff in the cache system work + * + * @param queryCacheKey The cache key that is for the query (not-segment) portion + * @param segments The segments to check if they are in cache + * + * @return A stream of the server and segment combinations as well as an optional that is present + * if a cached value was found + */ + private Stream<Pair<ServerToSegment, Optional<byte[]>>> maybeFetchCacheResults( final byte[] queryCacheKey, - final Set<ServerToSegment> segments + final Stream<ServerToSegment> segments ) { if (queryCacheKey == null) { - return Collections.emptyList(); + return segments.map(s -> Pair.of(s, Optional.empty())); } - final List<Pair<Interval, byte[]>> alreadyCachedResults = Lists.newArrayList(); - Map<ServerToSegment, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey); - // Pull cached segments from cache and remove from set of segments to query - final Map<Cache.NamedKey, byte[]> cachedValues = computeCachedValues(perSegmentCacheKeys); - - perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> { - final Interval segmentQueryInterval = segment.getSegmentDescriptor().getInterval(); - - final byte[] cachedValue = cachedValues.get(segmentCacheKey); - if (cachedValue != null) { - // remove cached segment from set of segments to query - segments.remove(segment); - alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue)); - } else if (populateCache) { - // otherwise, if populating cache, add segment to list of segments to cache - final String segmentIdentifier = segment.getServer().getSegment().getIdentifier(); - addCachePopulatorKey(segmentCacheKey, segmentIdentifier, segmentQueryInterval); - } - }); - return alreadyCachedResults; + // We materialize the stream here in order to have the bulk cache fetching work as expected + final List<Pair<ServerToSegment, Cache.NamedKey>> materializedKeyList = computePerSegmentCacheKeys( + segments, + queryCacheKey + ).collect(Collectors.toList()); + + // Do bulk fetch + final Map<Cache.NamedKey, Optional<byte[]>> cachedValues = computeCachedValues(materializedKeyList.stream()) + .collect(Pair.mapCollector()); + + // A limitation of the cache system is that the cached values are returned without passing through the original + // objects. This hash join is a way to get the ServerToSegment and Optional<byte[]> matched up again + return materializedKeyList + .stream() + .map(serializedPairSegmentAndKey -> lookupInCache(serializedPairSegmentAndKey, cachedValues)); } - private Map<ServerToSegment, Cache.NamedKey> computePerSegmentCacheKeys( - Set<ServerToSegment> segments, + private Stream<Pair<ServerToSegment, Cache.NamedKey>> computePerSegmentCacheKeys( + Stream<ServerToSegment> segments, byte[] queryCacheKey ) { - // cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order - Map<ServerToSegment, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap(); - for (ServerToSegment serverToSegment : segments) { - final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( - serverToSegment.getServer().getSegment().getIdentifier(), - serverToSegment.getSegmentDescriptor(), - queryCacheKey - ); - cacheKeys.put(serverToSegment, segmentCacheKey); - } - return cacheKeys; + return segments + .map(serverToSegment -> { + // cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order + final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( + serverToSegment.getServer().getSegment().getIdentifier(), + serverToSegment.getSegmentDescriptor(), + queryCacheKey + ); + return Pair.of(serverToSegment, segmentCacheKey); + }); } - private Map<Cache.NamedKey, byte[]> computeCachedValues(Map<ServerToSegment, Cache.NamedKey> cacheKeys) + private Stream<Pair<Cache.NamedKey, Optional<byte[]>>> computeCachedValues( Review comment: Same ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org