fjy commented on a change in pull request #6407: Add support
keepSegmentGranularity for automatic compaction
URL: https://github.com/apache/incubator-druid/pull/6407#discussion_r223196863
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
##########
@@ -209,179 +234,97 @@ private void updateQueue(String dataSourceName,
DataSourceCompactionConfig confi
* looked up for the last one day of the given intervalToSearch, and the
next day is searched again if the size of
* found segments are not enough to compact. This is repeated until enough
amount of segments are found.
*
- * @param timeline timeline of a dataSource
- * @param intervalToSearch interval to search
- * @param searchEnd the end of the whole searchInterval
+ * @param compactibleTimelineObjectHolderCursor timeline iterator
* @param config compaction config
*
- * @return a pair of the reduced interval of (intervalToSearch - interval of
found segments) and segments to compact
+ * @return segments to compact
*/
- @VisibleForTesting
- static Pair<Interval, SegmentsToCompact> findSegmentsToCompact(
- final VersionedIntervalTimeline<String, DataSegment> timeline,
- final Interval intervalToSearch,
- final DateTime searchEnd,
+ private static SegmentsToCompact findSegmentsToCompact(
+ final CompactibleTimelineObjectHolderCursor
compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config
)
{
- final long targetCompactionSize = config.getTargetCompactionSizeBytes();
- final int numTargetSegments = config.getNumTargetCompactionSegments();
+ final boolean keepSegmentGranularity = config.isKeepSegmentGranularity();
+ final long inputSegmentSize = config.getInputSegmentSizeBytes();
+ final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
final List<DataSegment> segmentsToCompact = new ArrayList<>();
- Interval searchInterval = intervalToSearch;
long totalSegmentsToCompactBytes = 0;
- // Finds segments to compact together while iterating searchInterval from
latest to oldest
- while (!Intervals.isEmpty(searchInterval)
- && totalSegmentsToCompactBytes < targetCompactionSize
- && segmentsToCompact.size() < numTargetSegments) {
- final Interval lookupInterval =
SegmentCompactorUtil.getNextLoopupInterval(searchInterval);
- // holders are sorted by their interval
- final List<TimelineObjectHolder<String, DataSegment>> holders =
timeline.lookup(lookupInterval);
-
- if (holders.isEmpty()) {
- // We found nothing. Continue to the next interval.
- searchInterval =
SegmentCompactorUtil.removeIntervalFromEnd(searchInterval, lookupInterval);
- continue;
- }
-
- for (int i = holders.size() - 1; i >= 0; i--) {
- final TimelineObjectHolder<String, DataSegment> holder =
holders.get(i);
- final List<PartitionChunk<DataSegment>> chunks =
Lists.newArrayList(holder.getObject().iterator());
- final long partitionBytes = chunks.stream().mapToLong(chunk ->
chunk.getObject().getSize()).sum();
- if (chunks.size() == 0 || partitionBytes == 0) {
- log.warn("Skip empty shard[%s]", holder);
- continue;
- }
-
- if
(!intervalToSearch.contains(chunks.get(0).getObject().getInterval())) {
- searchInterval = SegmentCompactorUtil.removeIntervalFromEnd(
- searchInterval,
- new Interval(chunks.get(0).getObject().getInterval().getStart(),
searchInterval.getEnd())
- );
- continue;
- }
-
- // Addition of the segments of a partition should be atomic.
- if (SegmentCompactorUtil.isCompactible(targetCompactionSize,
totalSegmentsToCompactBytes, partitionBytes) &&
- segmentsToCompact.size() + chunks.size() <= numTargetSegments) {
- chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
- totalSegmentsToCompactBytes += partitionBytes;
+ // Finds segments to compact together while iterating timeline from latest
to oldest
+ while (compactibleTimelineObjectHolderCursor.hasNext()
+ && totalSegmentsToCompactBytes < inputSegmentSize
+ && segmentsToCompact.size() < maxNumSegmentsToCompact) {
+ final TimelineObjectHolder<String, DataSegment> timeChunkHolder =
Preconditions.checkNotNull(
+ compactibleTimelineObjectHolderCursor.get(),
+ "timelineObjectHolder"
+ );
+ final List<PartitionChunk<DataSegment>> chunks =
Lists.newArrayList(timeChunkHolder.getObject().iterator());
+ final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk ->
chunk.getObject().getSize()).sum();
+
+ // The segments in a holder should be added all together or not.
+ if (SegmentCompactorUtil.isCompactibleSize(inputSegmentSize,
totalSegmentsToCompactBytes, timeChunkSizeBytes)
+ && SegmentCompactorUtil.isCompactibleNum(maxNumSegmentsToCompact,
segmentsToCompact.size(), chunks.size())
+ && (!keepSegmentGranularity || segmentsToCompact.size() == 0)) {
+ chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
+ totalSegmentsToCompactBytes += timeChunkSizeBytes;
+ } else {
+ if (segmentsToCompact.size() > 1) {
+ // We found some segmens to compact and cannot add more. End here.
+ return new SegmentsToCompact(segmentsToCompact);
} else {
- if (segmentsToCompact.size() > 1) {
- // We found some segmens to compact and cannot add more. End here.
- return checkCompactableSizeForLastSegmentOrReturn(
- segmentsToCompact,
- totalSegmentsToCompactBytes,
- timeline,
- searchInterval,
- searchEnd,
- config
+ // (*) Discard segments found so far because we can't compact them
anyway.
+ final int numSegmentsToCompact = segmentsToCompact.size();
+ segmentsToCompact.clear();
+
+ if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0,
timeChunkSizeBytes)) {
+ final DataSegment segment = chunks.get(0).getObject();
+ log.warn(
+ "shardSize[%d] for dataSource[%s] and interval[%s] is larger
than inputSegmentSize[%d]."
+ + " Contitnue to the next shard.",
Review comment:
Continue*
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]