drcrallen commented on a change in pull request #5913: Move Caching Cluster
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r202768459
##########
File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
##########
@@ -396,60 +457,80 @@ private String computeCurrentEtag(final
Set<ServerToSegment> segments, @Nullable
}
}
- private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults(
+ /**
+ * This materializes the input segment stream in order to let the BulgGet
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment)
portion
+ * @param segments The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an
optional that is present
+ * if a cached value was found
+ */
+ private Stream<SerializablePair<ServerToSegment, Optional<byte[]>>>
maybeFetchCacheResults(
final byte[] queryCacheKey,
- final Set<ServerToSegment> segments
+ final Stream<ServerToSegment> segments
)
{
if (queryCacheKey == null) {
- return Collections.emptyList();
+ return segments.map(s -> new SerializablePair<>(s, Optional.empty()));
}
- final List<Pair<Interval, byte[]>> alreadyCachedResults =
Lists.newArrayList();
- Map<ServerToSegment, Cache.NamedKey> perSegmentCacheKeys =
computePerSegmentCacheKeys(segments, queryCacheKey);
- // Pull cached segments from cache and remove from set of segments to
query
- final Map<Cache.NamedKey, byte[]> cachedValues =
computeCachedValues(perSegmentCacheKeys);
-
- perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
- final Interval segmentQueryInterval =
segment.getSegmentDescriptor().getInterval();
-
- final byte[] cachedValue = cachedValues.get(segmentCacheKey);
- if (cachedValue != null) {
- // remove cached segment from set of segments to query
- segments.remove(segment);
- alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
- } else if (populateCache) {
- // otherwise, if populating cache, add segment to list of segments
to cache
- final String segmentIdentifier =
segment.getServer().getSegment().getIdentifier();
- addCachePopulator(segmentCacheKey, segmentIdentifier,
segmentQueryInterval);
- }
- });
- return alreadyCachedResults;
+ // We materialize the stream here in order to have the bulk cache
fetching work as expected
+ final List<SerializablePair<ServerToSegment, Cache.NamedKey>>
materializedKeyList = computePerSegmentCacheKeys(
+ segments,
+ queryCacheKey
+ ).collect(Collectors.toList());
+
+ // Do bulk fetch
+ final Map<Cache.NamedKey, Optional<byte[]>> cachedValues =
computeCachedValues(
+ materializedKeyList.stream()
+ ).collect(
+ Collectors.toMap(
+ SerializablePair::getLhs,
+ SerializablePair::getRhs
+ )
+ );
+ // Hash join to return the stream
+ return materializedKeyList.stream().map(
+ psck -> {
+ final ServerToSegment segment = psck.getLhs();
+ final Cache.NamedKey segmentCacheKey = psck.getRhs();
+ final Interval segmentQueryInterval =
segment.getSegmentDescriptor().getInterval();
+ final Optional<byte[]> cachedValue =
Optional.ofNullable(cachedValues.get(segmentCacheKey))
+
.orElse(Optional.empty()); // Shouldn't happen in practice, but can screw up
unit tests
Review comment:
Added some more descriptions
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]