kfaraz commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1349610004


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
           .execute();
   }
 
+  /**
+   * Allocates and returns any extra versions that need to be committed for the
+   * given append segments.
+   * <p>
+   * This is typically needed when a REPLACE task started and finished after
+   * these append segments had already been allocated. As such,
+   * there would be some used segments in the DB with versions higher than 
these
+   * append segments.
+   */
+  private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+      Handle handle,
+      String dataSource,
+      Set<DataSegment> segmentsToAppend
+  ) throws IOException
+  {
+    if (segmentsToAppend.isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    final Set<Interval> appendIntervals = new HashSet<>();
+    final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new 
TreeMap<>();
+    for (DataSegment segment : segmentsToAppend) {
+      appendIntervals.add(segment.getInterval());
+      appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new 
HashSet<>())
+                             .add(segment);
+    }
+
+    // Fetch all used segments that overlap with any of the append intervals
+    final Collection<DataSegment> overlappingSegments = 
retrieveUsedSegmentsForIntervals(
+        dataSource,
+        new ArrayList<>(appendIntervals),
+        Segments.INCLUDING_OVERSHADOWED
+    );
+
+    final Map<String, Set<Interval>> committedVersionToIntervals = new 
HashMap<>();
+    final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new 
HashMap<>();
+    for (DataSegment segment : overlappingSegments) {
+      committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> 
new HashSet<>())
+                                 .add(segment.getInterval());
+      committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> 
new HashSet<>())
+                                 .add(segment);
+    }
+
+    final Set<DataSegment> upgradedSegments = new HashSet<>();
+    for (Map.Entry<String, Set<Interval>> entry : 
committedVersionToIntervals.entrySet()) {
+      final String upgradeVersion = entry.getKey();
+      Map<Interval, Set<DataSegment>> segmentsToUpgrade = 
getSegmentsWithVersionLowerThan(

Review Comment:
   Do you mean that the returned value should not be empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to