adarshsanjeev commented on code in PR #15024:
URL: https://github.com/apache/druid/pull/15024#discussion_r1348454215
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1163,14 +1167,67 @@ private QueryKit makeQueryControllerToolKit()
private DataSegmentTimelineView makeDataSegmentTimelineView()
{
+ final SegmentSource includeSegmentSource =
MultiStageQueryContext.getSegmentSources(
+ task.getQuerySpec()
+ .getQuery()
+ .context()
+ );
+
+ final boolean includeRealtime =
SegmentSource.shouldQueryRealtimeServers(includeSegmentSource);
+
return (dataSource, intervals) -> {
- final Collection<DataSegment> dataSegments =
+ final Iterable<ImmutableSegmentLoadInfo> realtimeAndHistoricalSegments;
+
+ // Fetch the realtime segments first, so that we don't miss any segment
if they get handed off between the two
+ // calls. Segments loaded on historicals are also returned here, we
deduplicate it below.
+ if (includeRealtime) {
+ realtimeAndHistoricalSegments =
context.coordinatorClient().fetchServerViewSegments(dataSource, intervals);
+ } else {
+ realtimeAndHistoricalSegments = ImmutableList.of();
+ }
+
+ // Fetch all published used segments from the metadata store.
+ final Collection<DataSegment> publishedUsedSegments =
FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource,
intervals), true);
- if (dataSegments.isEmpty()) {
+ int realtimeCount = 0;
+
+ // Deduplicate segments, giving preference to metadata store segments.
We do this so that if any segments have been
+ // handed off in between the two metadata calls above, we directly fetch
it from deep storage.
+ Set<DataSegment> unifiedSegmentView = new
HashSet<>(publishedUsedSegments);
+ for (ImmutableSegmentLoadInfo segmentLoadInfo :
realtimeAndHistoricalSegments) {
+ ImmutableSet<DruidServerMetadata> servers =
segmentLoadInfo.getServers();
+ // Filter out only realtime servers. We don't want to query
historicals for now, but we can in the future.
+ // This check can be modified then.
+ Set<DruidServerMetadata> realtimeServerMetadata
+ = servers.stream()
Review Comment:
We would be adding to the unifiedSegmentView set, so it should not add if
the segment is already present
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]