kfaraz commented on code in PR #13967:
URL: https://github.com/apache/druid/pull/13967#discussion_r1170084249


##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -295,6 +295,31 @@ tableName, getPayloadType(), getQuoteString(), 
getCollation()
     );
   }
 
+  private void alterSegmentsTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              final Batch batch = handle.createBatch();
+              if (!tableContainsColumn(handle, tableName, "handed_off")) {
+                log.info("Adding column: handed_off to table[%s]", tableName);

Review Comment:
   Nit: 
   ```suggestion
                   log.info("Adding column [handed_off] to table[%s]", 
tableName);
   ```



##########
server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java:
##########
@@ -167,6 +326,112 @@ private List<DataSegment> determineOvershadowedSegments()
         }
       }
     }
-    return overshadowedSegments;
+    return ImmutableSet.copyOf(overshadowedSegments);
+  }
+
+  public static Set<SegmentWithOvershadowedStatus> 
getSegmentsWithOvershadowedStatus(
+      Collection<ImmutableDruidDataSource> segments,
+      Set<DataSegment> overshadowedSegments,
+      Map<String, Set<SegmentId>> handedOffState)
+  {
+
+    final Stream<DataSegment> usedSegments = segments
+        .stream()
+        .flatMap(t -> t.getSegments().stream());
+
+    return usedSegments
+        .map(segment -> new SegmentWithOvershadowedStatus(
+            segment,
+            overshadowedSegments.contains(segment),
+            getHandedOffStateForSegment(handedOffState, 
segment.getDataSource(), segment.getId())
+             )
+        )
+        .collect(Collectors.toSet());
+  }
+
+  private static boolean getHandedOffStateForSegment(
+      Map<String, Set<SegmentId>> handedOffState,
+      String dataSource, SegmentId segmentId
+  )
+  {
+    return handedOffState
+        .getOrDefault(dataSource, new HashSet<>())
+        .contains(segmentId);
+  }
+
+  private static 
CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>> 
computeChanges(
+      Set<SegmentWithOvershadowedStatus> oldSegments,
+      Set<SegmentWithOvershadowedStatus> currentSegments,
+      CircularBuffer<ChangeRequestHistory.Holder<List<DataSegmentChange>>> 
oldChanges
+  )
+  {
+    if (oldSegments.isEmpty()) {
+      return new CircularBuffer<>(CHANGES_QUEUE_MAX_SIZE);
+    }
+
+    // a segment is added to the change set, if following changes:
+    // segmentId
+    // overshadowed state
+    // handed off state
+
+    Map<SegmentId, SegmentWithOvershadowedStatus> oldSegmentsMap =
+        oldSegments
+            .stream()
+            .collect(Collectors.toMap(
+                segment -> segment.getDataSegment().getId(),
+                Function.identity()));
+
+    Map<SegmentId, SegmentWithOvershadowedStatus> currentSegmentsMap =
+        currentSegments
+            .stream()
+            .collect(Collectors.toMap(
+                segment -> segment.getDataSegment().getId(),
+                Function.identity()));
+
+    Set<SegmentWithOvershadowedStatus> segmentToBeRemoved = 
Sets.difference(oldSegments, currentSegments);
+    Set<SegmentWithOvershadowedStatus> segmentToBeAdded = 
Sets.difference(currentSegments, oldSegments);
+
+    List<DataSegmentChange> changeList = new ArrayList<>();
+
+    segmentToBeRemoved.forEach(segment -> {
+      SegmentWithOvershadowedStatus newSegment = 
currentSegmentsMap.get(segment.getDataSegment().getId());
+      if (null == newSegment) {
+        changeList.add(new DataSegmentChange(segment, false, 
DataSegmentChange.ChangeReason.SEGMENT_REMOVED));
+      }
+    });
+
+    segmentToBeAdded.forEach(segment -> {
+      SegmentWithOvershadowedStatus oldSegment = 
oldSegmentsMap.get(segment.getDataSegment().getId());
+      boolean handedOffStatusChanged = !Objects.equals(null != oldSegment && 
oldSegment.isHandedOff(), segment.isHandedOff());
+      boolean overshadowedStatusChanged = !Objects.equals(null != oldSegment 
&& oldSegment.isOvershadowed(), segment.isOvershadowed());
+      DataSegmentChange.ChangeReason changeReason;
+
+      if (null == oldSegment) {
+        changeReason = DataSegmentChange.ChangeReason.SEGMENT_ADDED;
+      } else if (handedOffStatusChanged && overshadowedStatusChanged) {
+        changeReason = 
DataSegmentChange.ChangeReason.SEGMENT_OVERSHADOWED_AND_HANDED_OFF;
+      } else if (handedOffStatusChanged) {
+        changeReason = DataSegmentChange.ChangeReason.SEGMENT_HANDED_OFF;
+      } else {
+        changeReason = DataSegmentChange.ChangeReason.SEGMENT_OVERSHADOWED;
+      }

Review Comment:
   ```suggestion
         if (null == oldSegment) {
           changeReason = DataSegmentChange.ChangeReason.SEGMENT_ADDED;
           return;
         }
         
         final boolean handoffStatusChanged = oldSegment.isHandedOff() != 
newSegment.isHandedOff();
         final boolean overshadowStatusChanged = oldSegment.isOvershadowed() != 
newSegment.isOvershadowed();
         if (handoffStatusChanged && overshadowStatusChanged) {
           changeReason = 
DataSegmentChange.ChangeReason.SEGMENT_OVERSHADOWED_AND_HANDED_OFF;
         } else if (handoffStatusChanged) {
           changeReason = DataSegmentChange.ChangeReason.SEGMENT_HANDED_OFF;
         } else {
           changeReason = DataSegmentChange.ChangeReason.SEGMENT_OVERSHADOWED;
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to