drcrallen commented on a change in pull request #5913: Move Caching Cluster
Client to java streams and allow parallel intermediate merges
URL: https://github.com/apache/incubator-druid/pull/5913#discussion_r213755919
##########
File path: server/src/main/java/io/druid/client/CachingClusteredClient.java
##########
@@ -389,169 +471,249 @@ private String computeCurrentEtag(final
Set<ServerToSegment> segments, @Nullable
}
}
- private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults(
+ private Pair<ServerToSegment, Optional<byte[]>> lookupInCache(
+ Pair<ServerToSegment, Cache.NamedKey> key,
+ Map<Cache.NamedKey, Optional<byte[]>> cache
+ )
+ {
+ final ServerToSegment segment = key.getLhs();
+ final Cache.NamedKey segmentCacheKey = key.getRhs();
+ final Interval segmentQueryInterval =
segment.getSegmentDescriptor().getInterval();
+ final Optional<byte[]> cachedValue = Optional
+ .ofNullable(cache.get(segmentCacheKey))
+ // Shouldn't happen in practice, but can screw up unit tests where
cache state is mutated in crazy
+ // ways when the cache returns null instead of an optional.
+ .orElse(Optional.empty());
+ if (!cachedValue.isPresent()) {
+ // if populating cache, add segment to list of segments to cache if it
is not cached
+ final String segmentIdentifier =
segment.getServer().getSegment().getIdentifier();
+ addCachePopulatorKey(segmentCacheKey, segmentIdentifier,
segmentQueryInterval);
+ }
+ return Pair.of(segment, cachedValue);
+ }
+
+ /**
+ * This materializes the input segment stream in order to let the BulkGet
stuff in the cache system work
+ *
+ * @param queryCacheKey The cache key that is for the query (not-segment)
portion
+ * @param segments The segments to check if they are in cache
+ *
+ * @return A stream of the server and segment combinations as well as an
optional that is present
+ * if a cached value was found
+ */
+ private Stream<Pair<ServerToSegment, Optional<byte[]>>>
maybeFetchCacheResults(
final byte[] queryCacheKey,
- final Set<ServerToSegment> segments
+ final Stream<ServerToSegment> segments
)
{
if (queryCacheKey == null) {
- return Collections.emptyList();
+ return segments.map(s -> Pair.of(s, Optional.empty()));
}
- final List<Pair<Interval, byte[]>> alreadyCachedResults =
Lists.newArrayList();
- Map<ServerToSegment, Cache.NamedKey> perSegmentCacheKeys =
computePerSegmentCacheKeys(segments, queryCacheKey);
- // Pull cached segments from cache and remove from set of segments to
query
- final Map<Cache.NamedKey, byte[]> cachedValues =
computeCachedValues(perSegmentCacheKeys);
-
- perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
- final Interval segmentQueryInterval =
segment.getSegmentDescriptor().getInterval();
-
- final byte[] cachedValue = cachedValues.get(segmentCacheKey);
- if (cachedValue != null) {
- // remove cached segment from set of segments to query
- segments.remove(segment);
- alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
- } else if (populateCache) {
- // otherwise, if populating cache, add segment to list of segments
to cache
- final String segmentIdentifier =
segment.getServer().getSegment().getIdentifier();
- addCachePopulatorKey(segmentCacheKey, segmentIdentifier,
segmentQueryInterval);
- }
- });
- return alreadyCachedResults;
+ // We materialize the stream here in order to have the bulk cache
fetching work as expected
+ final List<Pair<ServerToSegment, Cache.NamedKey>> materializedKeyList =
computePerSegmentCacheKeys(
+ segments,
+ queryCacheKey
+ ).collect(Collectors.toList());
+
+ // Do bulk fetch
+ final Map<Cache.NamedKey, Optional<byte[]>> cachedValues =
computeCachedValues(materializedKeyList.stream())
+ .collect(Pair.mapCollector());
+
+ // A limitation of the cache system is that the cached values are
returned without passing through the original
+ // objects. This hash join is a way to get the ServerToSegment and
Optional<byte[]> matched up again
+ return materializedKeyList
+ .stream()
+ .map(serializedPairSegmentAndKey ->
lookupInCache(serializedPairSegmentAndKey, cachedValues));
}
- private Map<ServerToSegment, Cache.NamedKey> computePerSegmentCacheKeys(
- Set<ServerToSegment> segments,
+ private Stream<Pair<ServerToSegment, Cache.NamedKey>>
computePerSegmentCacheKeys(
+ Stream<ServerToSegment> segments,
byte[] queryCacheKey
)
{
- // cacheKeys map must preserve segment ordering, in order for shards to
always be combined in the same order
- Map<ServerToSegment, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
- for (ServerToSegment serverToSegment : segments) {
- final Cache.NamedKey segmentCacheKey =
CacheUtil.computeSegmentCacheKey(
- serverToSegment.getServer().getSegment().getIdentifier(),
- serverToSegment.getSegmentDescriptor(),
- queryCacheKey
- );
- cacheKeys.put(serverToSegment, segmentCacheKey);
- }
- return cacheKeys;
+ return segments
+ .map(serverToSegment -> {
+ // cacheKeys map must preserve segment ordering, in order for
shards to always be combined in the same order
+ final Cache.NamedKey segmentCacheKey =
CacheUtil.computeSegmentCacheKey(
+ serverToSegment.getServer().getSegment().getIdentifier(),
+ serverToSegment.getSegmentDescriptor(),
+ queryCacheKey
+ );
+ return Pair.of(serverToSegment, segmentCacheKey);
+ });
}
- private Map<Cache.NamedKey, byte[]>
computeCachedValues(Map<ServerToSegment, Cache.NamedKey> cacheKeys)
+ private Stream<Pair<Cache.NamedKey, Optional<byte[]>>> computeCachedValues(
+ Stream<Pair<ServerToSegment, Cache.NamedKey>> cacheKeys
+ )
{
if (useCache) {
- return cache.getBulk(Iterables.limit(cacheKeys.values(),
cacheConfig.getCacheBulkMergeLimit()));
+ return
cache.getBulk(cacheKeys.limit(cacheConfig.getCacheBulkMergeLimit()).map(Pair::getRhs));
} else {
- return ImmutableMap.of();
+ return Stream.empty();
}
}
+ private String cacheKey(String segmentId, Interval segmentInterval)
+ {
+ return StringUtils.format("%s_%s", segmentId, segmentInterval);
+ }
+
private void addCachePopulatorKey(
Cache.NamedKey segmentCacheKey,
String segmentIdentifier,
Interval segmentQueryInterval
)
{
- cachePopulatorKeyMap.put(
- StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
- segmentCacheKey
- );
+ cachePopulatorKeyMap.put(cacheKey(segmentIdentifier,
segmentQueryInterval), segmentCacheKey);
}
@Nullable
private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval
segmentInterval)
{
- return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId,
segmentInterval));
+ return cachePopulatorKeyMap.get(cacheKey(segmentId, segmentInterval));
}
- private SortedMap<DruidServer, List<SegmentDescriptor>>
groupSegmentsByServer(Set<ServerToSegment> segments)
+ /**
+ * Check the input stream to see what was cached and what was not. For the
ones that were cached, merge the results
+ * and return the merged sequence. For the ones that were NOT cached, get
the server result sequence queued up into
+ * the stream response
+ *
+ * @param segmentOrResult A list that is traversed in order to determine
what should be sent back. All segments
+ * should be on the same server.
+ *
+ * @return A sequence of either the merged cached results, or the server
results from any particular server
+ */
+ private Sequence<T> runOnServer(List<ServerMaybeSegmentMaybeCache<T>>
segmentOrResult)
{
- final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments =
Maps.newTreeMap();
- for (ServerToSegment serverToSegment : segments) {
- final QueryableDruidServer queryableDruidServer =
serverToSegment.getServer().pick();
-
- if (queryableDruidServer == null) {
- log.makeAlert(
- "No servers found for SegmentDescriptor[%s] for DataSource[%s]?!
How can this be?!",
- serverToSegment.getSegmentDescriptor(),
- query.getDataSource()
- ).emit();
- } else {
- final DruidServer server = queryableDruidServer.getServer();
- serverSegments.computeIfAbsent(server, s -> new
ArrayList<>()).add(serverToSegment.getSegmentDescriptor());
- }
+ final List<SegmentDescriptor> segmentsOfServer = segmentOrResult
+ .stream()
+ .map(ServerMaybeSegmentMaybeCache::getSegmentDescriptor)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ // We should only ever have cache or queries to run, not both. So if we
have no segments, try caches
+ if (segmentsOfServer.isEmpty()) {
+ // Have a special sequence for the cache results so the merge doesn't
go all crazy.
+ // See
io.druid.java.util.common.guava.MergeSequenceTest.testScrewsUpOnOutOfOrder for
an example
+ // With zero results actually being found (no segments no caches) this
should essentially return a no-op
+ // merge sequence
+ return new MergeSequence<>(query.getResultOrdering(),
Sequences.fromStream(
+ segmentOrResult
+ .stream()
+ .map(ServerMaybeSegmentMaybeCache::getCachedValue)
+ .filter(Objects::nonNull)
+ .map(Collections::singletonList)
+ .map(Sequences::simple)
+ ));
}
- return serverSegments;
+
+ final DruidServer server = segmentOrResult.get(0).getServer();
+ final QueryRunner serverRunner = serverView.getQueryRunner(server);
+
+ if (serverRunner == null) {
+ log.error("Server[%s] doesn't have a query runner", server);
+ return Sequences.empty();
+ }
+
+ final MultipleSpecificSegmentSpec segmentsOfServerSpec = new
MultipleSpecificSegmentSpec(segmentsOfServer);
+
+ final Sequence<T> serverResults;
+ if (isBySegment) {
+ serverResults = getBySegmentServerResults(serverRunner,
segmentsOfServerSpec);
+ } else if (!server.segmentReplicatable() || !populateCache) {
+ serverResults = getSimpleServerResults(serverRunner,
segmentsOfServerSpec);
+ } else {
+ serverResults = getAndCacheServerResults(serverRunner,
segmentsOfServerSpec);
+ }
+ return serverResults;
}
- private void addSequencesFromCache(
- final List<Sequence<T>> listOfSequences,
- final List<Pair<Interval, byte[]>> cachedResults
- )
+ private ServerMaybeSegmentMaybeCache<T> pickServer(Pair<ServerToSegment,
Optional<T>> tuple)
{
- if (strategy == null) {
- return;
+ final Optional<T> maybeResult = tuple.getRhs();
+ if (maybeResult.isPresent()) {
+ return new ServerMaybeSegmentMaybeCache<>(ALREADY_CACHED_SERVER, null,
maybeResult.get());
}
-
- final Function<Object, T> pullFromCacheFunction =
strategy.pullFromSegmentLevelCache();
- final TypeReference<Object> cacheObjectClazz =
strategy.getCacheObjectClazz();
- for (Pair<Interval, byte[]> cachedResultPair : cachedResults) {
- final byte[] cachedResult = cachedResultPair.rhs;
- Sequence<Object> cachedSequence = new BaseSequence<>(
- new BaseSequence.IteratorMaker<Object, Iterator<Object>>()
- {
- @Override
- public Iterator<Object> make()
- {
- try {
- if (cachedResult.length == 0) {
- return Collections.emptyIterator();
- }
-
- return objectMapper.readValues(
- objectMapper.getFactory().createParser(cachedResult),
- cacheObjectClazz
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void cleanup(Iterator<Object> iterFromMake)
- {
- }
- }
- );
- listOfSequences.add(Sequences.map(cachedSequence,
pullFromCacheFunction));
+ final ServerToSegment serverToSegment = tuple.getLhs();
+ final QueryableDruidServer queryableDruidServer =
serverToSegment.getServer().pick();
+ if (queryableDruidServer == null) {
+ log.makeAlert(
+ "No servers found for SegmentDescriptor[%s] for DataSource[%s]?!
How can this be?!",
+ serverToSegment.getSegmentDescriptor(),
+ query.getDataSource()
+ ).emit();
+ return new ServerMaybeSegmentMaybeCache<>(ALREADY_CACHED_SERVER, null,
null);
}
+ final DruidServer server = queryableDruidServer.getServer();
+ return new ServerMaybeSegmentMaybeCache<>(
+ server,
+ serverToSegment.getSegmentDescriptor(),
+ null
+ );
}
- private void addSequencesFromServer(
- final List<Sequence<T>> listOfSequences,
- final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer
+ /**
+ * This materializes the input stream in order to group it by server. This
method takes in the stream of cache
+ * resolved items and will group all the items by server. Each entry in
the output stream contains a list whose
+ * entries' getServer is the same. Each entry will either have a present
segemnt descriptor or a present result,
+ * but not both. Downstream consumers should check each and handle
appropriately.
+ *
+ * @param cacheResolvedStream A stream of the cached results for different
segment queries
+ *
+ * @return A stream of potentially cached results per server
+ */
+
+ private Stream<List<ServerMaybeSegmentMaybeCache<T>>>
groupCachedResultsByServer(
+ Stream<Pair<ServerToSegment, Optional<T>>> cacheResolvedStream
)
{
- segmentsByServer.forEach((server, segmentsOfServer) -> {
- final QueryRunner serverRunner = serverView.getQueryRunner(server);
+ return cacheResolvedStream
+ .map(this::pickServer)
+
.collect(Collectors.groupingBy(ServerMaybeSegmentMaybeCache::getServer))
+ .values()
+ // At this point we have the segments per server, and a special
entry for the pre-cached results.
+ // As of the time of this writing, this results in a
java.util.HashMap.ValueSpliterator which
+ // does not have great properties for splitting in parallel since it
does not have total size awareness
+ // yet. I hope future implementations of the grouping collector can
handle such a scenario where the
+ // grouping result is immutable and can be split very easily into
parallel spliterators
+ .stream()
+ .filter(l -> !l.isEmpty())
+ // Get rid of any alerted conditions missing queryableDruidServer
+ .filter(l -> l.get(0).getCachedValue() != null ||
l.get(0).getSegmentDescriptor() != null);
+ }
- if (serverRunner == null) {
- log.error("Server[%s] doesn't have a query runner", server);
- return;
+ private Stream<Pair<ServerToSegment, Optional<T>>> deserializeFromCache(
+ final Stream<Pair<ServerToSegment, Optional<byte[]>>> cachedResults
+ )
+ {
+ if (strategy == null) {
+ return cachedResults.map(s -> Pair.of(s.getLhs(), Optional.empty()));
+ }
+ final Function<Object, T> pullFromCacheFunction =
strategy.pullFromSegmentLevelCache()::apply;
+ final TypeReference<Object> cacheObjectClazz =
strategy.getCacheObjectClazz();
+ return cachedResults.flatMap(cachedResultPair -> {
+ if (!cachedResultPair.getRhs().isPresent()) {
+ return Stream.of(Pair.of(cachedResultPair.getLhs(),
Optional.empty()));
Review comment:
`Optional<byte[]>` --> `Optional<T>` is required. I either have to hard-cast
it or just make a new object. IMHO just making a new object is safer even
though `Optional.empty()` should be castable to whatever
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]