This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8ddb8476588 Fix message when skipping compaction (#15460)
8ddb8476588 is described below
commit 8ddb847658813ee6445f86cfce51aae4f65d3d15
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Nov 30 14:57:13 2023 +0530
Fix message when skipping compaction (#15460)
---
.../compact/NewestSegmentFirstIterator.java | 77 ++++++++++++++--------
1 file changed, 49 insertions(+), 28 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
index f9059dca67b..2c888ad7170 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
@@ -36,7 +36,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
@@ -71,7 +70,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
private final Map<String, CompactionStatistics> compactedSegmentStats = new
HashMap<>();
private final Map<String, CompactionStatistics> skippedSegmentStats = new
HashMap<>();
- private final Map<String, CompactibleTimelineObjectHolderCursor>
timelineIterators;
+ private final Map<String, CompactibleSegmentIterator> timelineIterators;
// This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current
segmentGranularity, the same set of segments
@@ -149,10 +148,18 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
originalTimeline = timeline;
timeline = timelineWithConfiguredSegmentGranularity;
}
- final List<Interval> searchIntervals =
- findInitialSearchInterval(dataSource, timeline,
config.getSkipOffsetFromLatest(), configuredSegmentGranularity,
skipIntervals.get(dataSource));
+ final List<Interval> searchIntervals = findInitialSearchInterval(
+ dataSource,
+ timeline,
+ config.getSkipOffsetFromLatest(),
+ configuredSegmentGranularity,
+ skipIntervals.get(dataSource)
+ );
if (!searchIntervals.isEmpty()) {
- timelineIterators.put(dataSource, new
CompactibleTimelineObjectHolderCursor(timeline, searchIntervals,
originalTimeline));
+ timelineIterators.put(
+ dataSource,
+ new CompactibleSegmentIterator(timeline, searchIntervals,
originalTimeline)
+ );
}
}
});
@@ -218,41 +225,46 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
}
/**
- * Iterates the given {@link VersionedIntervalTimeline}. Only compactible
{@link TimelineObjectHolder}s are returned,
- * which means the holder always has at least one {@link DataSegment}.
+ * Iterates compactible segments in a {@link SegmentTimeline}.
*/
- private static class CompactibleTimelineObjectHolderCursor implements
Iterator<List<DataSegment>>
+ private static class CompactibleSegmentIterator implements
Iterator<List<DataSegment>>
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
@Nullable
private final SegmentTimeline originalTimeline;
- CompactibleTimelineObjectHolderCursor(
+ CompactibleSegmentIterator(
SegmentTimeline timeline,
List<Interval> totalIntervalsToSearch,
// originalTimeline can be null if timeline was not modified
@Nullable SegmentTimeline originalTimeline
)
{
- this.holders = totalIntervalsToSearch
- .stream()
- .flatMap(interval -> timeline
+ this.holders = totalIntervalsToSearch.stream().flatMap(
+ interval -> timeline
.lookup(interval)
.stream()
.filter(holder -> isCompactibleHolder(interval, holder))
- )
- .collect(Collectors.toList());
+ ).collect(Collectors.toList());
this.originalTimeline = originalTimeline;
}
- private boolean isCompactibleHolder(Interval interval,
TimelineObjectHolder<String, DataSegment> holder)
+ /**
+ * Checks if the {@link TimelineObjectHolder} satisfies the following:
+ * <ul>
+ * <li>It has atleast one segment.</li>
+ * <li>The interval of the segments is contained in the
searchInterval.</li>
+ * <li>The total bytes across all the segments is positive.</li>
+ * </ul>
+ */
+ private boolean isCompactibleHolder(Interval searchInterval,
TimelineObjectHolder<String, DataSegment> holder)
{
final Iterator<PartitionChunk<DataSegment>> chunks =
holder.getObject().iterator();
if (!chunks.hasNext()) {
- return false; // There should be at least one chunk for a holder to be
compactible.
+ return false;
}
PartitionChunk<DataSegment> firstChunk = chunks.next();
- if (!interval.contains(firstChunk.getObject().getInterval())) {
+ if (!searchInterval.contains(firstChunk.getObject().getInterval())) {
return false;
}
long partitionBytes = firstChunk.getObject().getSize();
@@ -268,10 +280,19 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
return !holders.isEmpty();
}
+ /**
+ * Returns the next list of compactible segments in the datasource
timeline.
+ * The returned list satisfies the following conditions:
+ * <ul>
+ * <li>The list is non-null and non-empty.</li>
+ * <li>The segments are present in the search interval.</li>
+ * <li>Total bytes of segments in the list is greater than zero.</li>
+ * </ul>
+ */
@Override
public List<DataSegment> next()
{
- if (holders.isEmpty()) {
+ if (!hasNext()) {
throw new NoSuchElementException();
}
TimelineObjectHolder<String, DataSegment> timelineObjectHolder =
holders.remove(holders.size() - 1);
@@ -302,20 +323,20 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
final DataSourceCompactionConfig config
)
{
- final CompactibleTimelineObjectHolderCursor
compactibleTimelineObjectHolderCursor
+ final CompactibleSegmentIterator compactibleSegmentIterator
= timelineIterators.get(dataSourceName);
- if (compactibleTimelineObjectHolderCursor == null) {
- log.warn("Skipping dataSource[%s] as there is no timeline for it.",
dataSourceName);
+ if (compactibleSegmentIterator == null) {
+ log.warn(
+ "Skipping compaction for datasource[%s] as there is no compactible
segment in its timeline.",
+ dataSourceName
+ );
return SegmentsToCompact.empty();
}
final long inputSegmentSize = config.getInputSegmentSizeBytes();
- while (compactibleTimelineObjectHolderCursor.hasNext()) {
- List<DataSegment> segments =
compactibleTimelineObjectHolderCursor.next();
- if (segments.isEmpty()) {
- throw new ISE("No segment is found?");
- }
+ while (compactibleSegmentIterator.hasNext()) {
+ List<DataSegment> segments = compactibleSegmentIterator.next();
final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
final Interval interval = candidates.getUmbrellaInterval();
@@ -353,7 +374,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
}
}
- log.debug("All segments look good! Nothing to compact");
+ log.debug("No more segments to compact for datasource[%s].",
dataSourceName);
return SegmentsToCompact.empty();
}
@@ -377,7 +398,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
*/
private List<Interval> findInitialSearchInterval(
String dataSourceName,
- VersionedIntervalTimeline<String, DataSegment> timeline,
+ SegmentTimeline timeline,
Period skipOffset,
Granularity configuredSegmentGranularity,
@Nullable List<Interval> skipIntervals
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]