kfaraz commented on code in PR #13967:
URL: https://github.com/apache/druid/pull/13967#discussion_r1170122183


##########
server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java:
##########
@@ -167,6 +326,112 @@ private List<DataSegment> determineOvershadowedSegments()
         }
       }
     }
-    return overshadowedSegments;
+    return ImmutableSet.copyOf(overshadowedSegments);
+  }
+
+  public static Set<SegmentWithOvershadowedStatus> 
getSegmentsWithOvershadowedStatus(
+      Collection<ImmutableDruidDataSource> segments,
+      Set<DataSegment> overshadowedSegments,
+      Map<String, Set<SegmentId>> handedOffState)
+  {
+
+    final Stream<DataSegment> usedSegments = segments
+        .stream()
+        .flatMap(t -> t.getSegments().stream());
+
+    return usedSegments
+        .map(segment -> new SegmentWithOvershadowedStatus(
+            segment,
+            overshadowedSegments.contains(segment),
+            getHandedOffStateForSegment(handedOffState, 
segment.getDataSource(), segment.getId())
+             )
+        )
+        .collect(Collectors.toSet());
+  }
+
+  private static boolean getHandedOffStateForSegment(
+      Map<String, Set<SegmentId>> handedOffState,
+      String dataSource, SegmentId segmentId
+  )
+  {
+    return handedOffState
+        .getOrDefault(dataSource, new HashSet<>())
+        .contains(segmentId);
+  }
+
+  private static 
CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>> 
computeChanges(
+      Set<SegmentWithOvershadowedStatus> oldSegments,
+      Set<SegmentWithOvershadowedStatus> currentSegments,
+      CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>> 
oldChanges
+  )
+  {
+    if (oldSegments.isEmpty()) {
+      return new CircularBuffer<>(CHANGES_QUEUE_MAX_SIZE);
+    }
+
+    // a segment is added to the change set, if following changes:
+    // segmentId
+    // overshadowed state
+    // handed off state
+
+    Map<SegmentId, SegmentWithOvershadowedStatus> oldSegmentsMap =
+        oldSegments
+            .stream()
+            .collect(Collectors.toMap(
+                segment -> segment.getDataSegment().getId(),
+                Function.identity()));
+
+    Map<SegmentId, SegmentWithOvershadowedStatus> currentSegmentsMap =
+        currentSegments
+            .stream()
+            .collect(Collectors.toMap(
+                segment -> segment.getDataSegment().getId(),
+                Function.identity()));
+
+    Set<SegmentWithOvershadowedStatus> segmentToBeRemoved = 
Sets.difference(oldSegments, currentSegments);
+    Set<SegmentWithOvershadowedStatus> segmentToBeAdded = 
Sets.difference(currentSegments, oldSegments);
+
+    List<DataSegmentChange> changeList = new ArrayList<>();
+
+    segmentToBeRemoved.forEach(segment -> {
+      SegmentWithOvershadowedStatus newSegment = 
currentSegmentsMap.get(segment.getDataSegment().getId());
+      if (null == newSegment) {
+        changeList.add(new DataSegmentChange(segment, false, 
DataSegmentChange.ChangeReason.SEGMENT_REMOVED));
+      }
+    });
+
+    segmentToBeAdded.forEach(segment -> {
+      SegmentWithOvershadowedStatus oldSegment = 
oldSegmentsMap.get(segment.getDataSegment().getId());
+      boolean handedOffStatusChanged = !Objects.equals(null != oldSegment && 
oldSegment.isHandedOff(), segment.isHandedOff());
+      boolean overshadowedStatusChanged = !Objects.equals(null != oldSegment 
&& oldSegment.isOvershadowed(), segment.isOvershadowed());
+      DataSegmentChange.ChangeReason changeReason;
+
+      if (null == oldSegment) {
+        changeReason = DataSegmentChange.ChangeReason.SEGMENT_ADDED;
+      } else if (handedOffStatusChanged && overshadowedStatusChanged) {
+        changeReason = 
DataSegmentChange.ChangeReason.SEGMENT_OVERSHADOWED_AND_HANDED_OFF;
+      } else if (handedOffStatusChanged) {
+        changeReason = DataSegmentChange.ChangeReason.SEGMENT_HANDED_OFF;
+      } else {
+        changeReason = DataSegmentChange.ChangeReason.SEGMENT_OVERSHADOWED;
+      }
+      changeList.add(new DataSegmentChange(segment, true, changeReason));
+    });
+
+    ChangeRequestHistory.Counter lastCounter = getLastCounter(oldChanges);
+
+    if (changeList.size() > 0) {
+      oldChanges.add(new ChangeRequestHistory.Holder<>(changeList, 
lastCounter.inc()));

Review Comment:
   The caller of this method should do this. This method should be responsible 
only for the computing the latest set of changes, not adding it to the buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to