kfaraz commented on code in PR #13967:
URL: https://github.com/apache/druid/pull/13967#discussion_r1170127658
##########
server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java:
##########
@@ -167,6 +326,112 @@ private List<DataSegment> determineOvershadowedSegments()
}
}
}
- return overshadowedSegments;
+ return ImmutableSet.copyOf(overshadowedSegments);
+ }
+
+ public static Set<SegmentWithOvershadowedStatus>
getSegmentsWithOvershadowedStatus(
+ Collection<ImmutableDruidDataSource> segments,
+ Set<DataSegment> overshadowedSegments,
+ Map<String, Set<SegmentId>> handedOffState)
+ {
+
+ final Stream<DataSegment> usedSegments = segments
+ .stream()
+ .flatMap(t -> t.getSegments().stream());
+
+ return usedSegments
+ .map(segment -> new SegmentWithOvershadowedStatus(
+ segment,
+ overshadowedSegments.contains(segment),
+ getHandedOffStateForSegment(handedOffState,
segment.getDataSource(), segment.getId())
+ )
+ )
+ .collect(Collectors.toSet());
+ }
+
+ private static boolean getHandedOffStateForSegment(
+ Map<String, Set<SegmentId>> handedOffState,
+ String dataSource, SegmentId segmentId
+ )
+ {
+ return handedOffState
+ .getOrDefault(dataSource, new HashSet<>())
+ .contains(segmentId);
+ }
+
+ private static
CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>>
computeChanges(
+ Set<SegmentWithOvershadowedStatus> oldSegments,
+ Set<SegmentWithOvershadowedStatus> currentSegments,
+ CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>>
oldChanges
+ )
+ {
+ if (oldSegments.isEmpty()) {
+ return new CircularBuffer<>(CHANGES_QUEUE_MAX_SIZE);
+ }
+
+ // a segment is added to the change set, if following changes:
+ // segmentId
+ // overshadowed state
+ // handed off state
+
+ Map<SegmentId, SegmentWithOvershadowedStatus> oldSegmentsMap =
+ oldSegments
+ .stream()
+ .collect(Collectors.toMap(
+ segment -> segment.getDataSegment().getId(),
+ Function.identity()));
+
+ Map<SegmentId, SegmentWithOvershadowedStatus> currentSegmentsMap =
+ currentSegments
+ .stream()
+ .collect(Collectors.toMap(
+ segment -> segment.getDataSegment().getId(),
+ Function.identity()));
+
+ Set<SegmentWithOvershadowedStatus> segmentToBeRemoved =
Sets.difference(oldSegments, currentSegments);
+ Set<SegmentWithOvershadowedStatus> segmentToBeAdded =
Sets.difference(currentSegments, oldSegments);
+
+ List<DataSegmentChange> changeList = new ArrayList<>();
+
+ segmentToBeRemoved.forEach(segment -> {
+ SegmentWithOvershadowedStatus newSegment =
currentSegmentsMap.get(segment.getDataSegment().getId());
+ if (null == newSegment) {
+ changeList.add(new DataSegmentChange(segment, false,
DataSegmentChange.ChangeReason.SEGMENT_REMOVED));
+ }
+ });
+
+ segmentToBeAdded.forEach(segment -> {
+ SegmentWithOvershadowedStatus oldSegment =
oldSegmentsMap.get(segment.getDataSegment().getId());
+ boolean handedOffStatusChanged = !Objects.equals(null != oldSegment &&
oldSegment.isHandedOff(), segment.isHandedOff());
Review Comment:
Avoid these repeated null checks to improve readability. You can instead
return after line 410 and handle the rest of the cases separately. See
following suggestion.
##########
server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java:
##########
@@ -69,32 +127,67 @@ public static DataSourcesSnapshot
fromUsedSegmentsTimelines(
dataSourcesWithAllUsedSegments.put(dataSourceName,
dataSource.toImmutableDruidDataSource());
}
);
- return new DataSourcesSnapshot(dataSourcesWithAllUsedSegments,
usedSegmentsTimelinesPerDataSource);
+ return new DataSourcesSnapshot(
+ dataSourcesWithAllUsedSegments,
+ usedSegmentsTimelinesPerDataSource,
+ ImmutableMap.of(),
+ new CircularBuffer<>(CHANGES_QUEUE_MAX_SIZE)
+ );
}
+ private static final int CHANGES_QUEUE_MAX_SIZE = 10;
private final Map<String, ImmutableDruidDataSource>
dataSourcesWithAllUsedSegments;
private final Map<String, SegmentTimeline>
usedSegmentsTimelinesPerDataSource;
private final ImmutableSet<DataSegment> overshadowedSegments;
- public DataSourcesSnapshot(Map<String, ImmutableDruidDataSource>
dataSourcesWithAllUsedSegments)
+ private final Map<String, Set<SegmentId>> handedOffStatePerDataSource;
+
+ private final
CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>> changes;
Review Comment:
This is probably not the best place for maintaining the list of changes. The
snapshot represents the state at a point in time. It need not be aware of the
changes that come before it. This would be better placed inside the
`SqlSegmentsMetadataManager` itself as it already houses the
`DataSourcesSnapshot`. That would help clean up some of the logic in this class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]