This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ddb8476588 Fix message when skipping compaction (#15460)
8ddb8476588 is described below

commit 8ddb847658813ee6445f86cfce51aae4f65d3d15
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Nov 30 14:57:13 2023 +0530

    Fix message when skipping compaction (#15460)
---
 .../compact/NewestSegmentFirstIterator.java        | 77 ++++++++++++++--------
 1 file changed, 49 insertions(+), 28 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
index f9059dca67b..2c888ad7170 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
@@ -36,7 +36,6 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NumberedPartitionChunk;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
@@ -71,7 +70,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   private final Map<String, CompactionStatistics> compactedSegmentStats = new 
HashMap<>();
   private final Map<String, CompactionStatistics> skippedSegmentStats = new 
HashMap<>();
 
-  private final Map<String, CompactibleTimelineObjectHolderCursor> 
timelineIterators;
+  private final Map<String, CompactibleSegmentIterator> timelineIterators;
 
   // This is needed for datasource that has segmentGranularity configured
   // If configured segmentGranularity in config is finer than current 
segmentGranularity, the same set of segments
@@ -149,10 +148,18 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
           originalTimeline = timeline;
           timeline = timelineWithConfiguredSegmentGranularity;
         }
-        final List<Interval> searchIntervals =
-            findInitialSearchInterval(dataSource, timeline, 
config.getSkipOffsetFromLatest(), configuredSegmentGranularity, 
skipIntervals.get(dataSource));
+        final List<Interval> searchIntervals = findInitialSearchInterval(
+            dataSource,
+            timeline,
+            config.getSkipOffsetFromLatest(),
+            configuredSegmentGranularity,
+            skipIntervals.get(dataSource)
+        );
         if (!searchIntervals.isEmpty()) {
-          timelineIterators.put(dataSource, new 
CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, 
originalTimeline));
+          timelineIterators.put(
+              dataSource,
+              new CompactibleSegmentIterator(timeline, searchIntervals, 
originalTimeline)
+          );
         }
       }
     });
@@ -218,41 +225,46 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   }
 
   /**
-   * Iterates the given {@link VersionedIntervalTimeline}. Only compactible 
{@link TimelineObjectHolder}s are returned,
-   * which means the holder always has at least one {@link DataSegment}.
+   * Iterates compactible segments in a {@link SegmentTimeline}.
    */
-  private static class CompactibleTimelineObjectHolderCursor implements 
Iterator<List<DataSegment>>
+  private static class CompactibleSegmentIterator implements 
Iterator<List<DataSegment>>
   {
     private final List<TimelineObjectHolder<String, DataSegment>> holders;
     @Nullable
     private final SegmentTimeline originalTimeline;
 
-    CompactibleTimelineObjectHolderCursor(
+    CompactibleSegmentIterator(
         SegmentTimeline timeline,
         List<Interval> totalIntervalsToSearch,
         // originalTimeline can be null if timeline was not modified
         @Nullable SegmentTimeline originalTimeline
     )
     {
-      this.holders = totalIntervalsToSearch
-          .stream()
-          .flatMap(interval -> timeline
+      this.holders = totalIntervalsToSearch.stream().flatMap(
+          interval -> timeline
               .lookup(interval)
               .stream()
               .filter(holder -> isCompactibleHolder(interval, holder))
-          )
-          .collect(Collectors.toList());
+      ).collect(Collectors.toList());
       this.originalTimeline = originalTimeline;
     }
 
-    private boolean isCompactibleHolder(Interval interval, 
TimelineObjectHolder<String, DataSegment> holder)
+    /**
+     * Checks if the {@link TimelineObjectHolder} satisfies the following:
+     * <ul>
+     * <li>It has atleast one segment.</li>
+     * <li>The interval of the segments is contained in the 
searchInterval.</li>
+     * <li>The total bytes across all the segments is positive.</li>
+     * </ul>
+     */
+    private boolean isCompactibleHolder(Interval searchInterval, 
TimelineObjectHolder<String, DataSegment> holder)
     {
       final Iterator<PartitionChunk<DataSegment>> chunks = 
holder.getObject().iterator();
       if (!chunks.hasNext()) {
-        return false; // There should be at least one chunk for a holder to be 
compactible.
+        return false;
       }
       PartitionChunk<DataSegment> firstChunk = chunks.next();
-      if (!interval.contains(firstChunk.getObject().getInterval())) {
+      if (!searchInterval.contains(firstChunk.getObject().getInterval())) {
         return false;
       }
       long partitionBytes = firstChunk.getObject().getSize();
@@ -268,10 +280,19 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       return !holders.isEmpty();
     }
 
+    /**
+     * Returns the next list of compactible segments in the datasource 
timeline.
+     * The returned list satisfies the following conditions:
+     * <ul>
+     * <li>The list is non-null and non-empty.</li>
+     * <li>The segments are present in the search interval.</li>
+     * <li>Total bytes of segments in the list is greater than zero.</li>
+     * </ul>
+     */
     @Override
     public List<DataSegment> next()
     {
-      if (holders.isEmpty()) {
+      if (!hasNext()) {
         throw new NoSuchElementException();
       }
       TimelineObjectHolder<String, DataSegment> timelineObjectHolder = 
holders.remove(holders.size() - 1);
@@ -302,20 +323,20 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       final DataSourceCompactionConfig config
   )
   {
-    final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor
+    final CompactibleSegmentIterator compactibleSegmentIterator
         = timelineIterators.get(dataSourceName);
-    if (compactibleTimelineObjectHolderCursor == null) {
-      log.warn("Skipping dataSource[%s] as there is no timeline for it.", 
dataSourceName);
+    if (compactibleSegmentIterator == null) {
+      log.warn(
+          "Skipping compaction for datasource[%s] as there is no compactible 
segment in its timeline.",
+          dataSourceName
+      );
       return SegmentsToCompact.empty();
     }
 
     final long inputSegmentSize = config.getInputSegmentSizeBytes();
 
-    while (compactibleTimelineObjectHolderCursor.hasNext()) {
-      List<DataSegment> segments = 
compactibleTimelineObjectHolderCursor.next();
-      if (segments.isEmpty()) {
-        throw new ISE("No segment is found?");
-      }
+    while (compactibleSegmentIterator.hasNext()) {
+      List<DataSegment> segments = compactibleSegmentIterator.next();
 
       final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
       final Interval interval = candidates.getUmbrellaInterval();
@@ -353,7 +374,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       }
     }
 
-    log.debug("All segments look good! Nothing to compact");
+    log.debug("No more segments to compact for datasource[%s].", 
dataSourceName);
     return SegmentsToCompact.empty();
   }
 
@@ -377,7 +398,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
    */
   private List<Interval> findInitialSearchInterval(
       String dataSourceName,
-      VersionedIntervalTimeline<String, DataSegment> timeline,
+      SegmentTimeline timeline,
       Period skipOffset,
       Granularity configuredSegmentGranularity,
       @Nullable List<Interval> skipIntervals


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to