This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 36f3413913d Add new compaction policy to prioritize fragmented 
intervals (#18802)
36f3413913d is described below

commit 36f3413913d6ab34e409983d87664d9157fb4893
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Dec 5 08:43:54 2025 +0530

    Add new compaction policy to prioritize fragmented intervals (#18802)
---
 .../compaction/BaseCandidateSearchPolicy.java      |   5 +-
 .../server/compaction/CompactionCandidate.java     |  14 ++
 .../CompactionCandidateSearchPolicy.java           |  75 ++++++-
 .../server/compaction/CompactionStatistics.java    |  10 +
 .../druid/server/compaction/CompactionStatus.java  | 243 +++++++++++++++++----
 .../server/compaction/CompactionStatusTracker.java |  12 +-
 .../compaction/FixedIntervalOrderPolicy.java       |   7 +-
 .../MostFragmentedIntervalFirstPolicy.java         | 180 +++++++++++++++
 .../compaction/CompactionStatusTrackerTest.java    |   2 +-
 .../MostFragmentedIntervalFirstPolicyTest.java     | 214 ++++++++++++++++++
 10 files changed, 697 insertions(+), 65 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
 
b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
index 7d7d117f08f..0a68f6a51c9 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
@@ -64,13 +64,12 @@ public abstract class BaseCandidateSearchPolicy implements 
CompactionCandidateSe
   }
 
   @Override
-  public boolean isEligibleForCompaction(
+  public Eligibility checkEligibilityForCompaction(
       CompactionCandidate candidate,
-      CompactionStatus currentCompactionStatus,
       CompactionTaskStatus latestTaskStatus
   )
   {
-    return true;
+    return Eligibility.OK;
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
index f936f3d49a9..af8b32ebe6d 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
@@ -137,6 +137,20 @@ public class CompactionCandidate
     return CompactionStatistics.create(totalBytes, numSegments(), 
numIntervals);
   }
 
+  @Nullable
+  public CompactionStatistics getCompactedStats()
+  {
+    return (currentStatus == null || currentStatus.getCompactedStats() == null)
+           ? null : currentStatus.getCompactedStats();
+  }
+
+  @Nullable
+  public CompactionStatistics getUncompactedStats()
+  {
+    return (currentStatus == null || currentStatus.getUncompactedStats() == 
null)
+           ? null : currentStatus.getUncompactedStats();
+  }
+
   /**
    * Current compaction status of the time chunk corresponding to this 
candidate.
    */
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
index cc99e03bf21..bfb69787dd8 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
@@ -21,15 +21,19 @@ package org.apache.druid.server.compaction;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
 
+import java.util.Objects;
+
 /**
  * Policy used by {@link CompactSegments} duty to pick segments for compaction.
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
     @JsonSubTypes.Type(name = "newestSegmentFirst", value = 
NewestSegmentFirstPolicy.class),
-    @JsonSubTypes.Type(name = "fixedIntervalOrder", value = 
FixedIntervalOrderPolicy.class)
+    @JsonSubTypes.Type(name = "fixedIntervalOrder", value = 
FixedIntervalOrderPolicy.class),
+    @JsonSubTypes.Type(name = "mostFragmentedFirst", value = 
MostFragmentedIntervalFirstPolicy.class)
 })
 public interface CompactionCandidateSearchPolicy
 {
@@ -37,8 +41,8 @@ public interface CompactionCandidateSearchPolicy
    * Compares between two compaction candidates. Used to determine the
    * order in which segments and intervals should be picked for compaction.
    *
-   * @return A positive value if {@code candidateA} should be picked first, a
-   * negative value if {@code candidateB} should be picked first or zero if the
+   * @return A negative value if {@code candidateA} should be picked first, a
+   * positive value if {@code candidateB} should be picked first or zero if the
    * order does not matter.
    */
   int compareCandidates(CompactionCandidate candidateA, CompactionCandidate 
candidateB);
@@ -47,10 +51,71 @@ public interface CompactionCandidateSearchPolicy
    * Checks if the given {@link CompactionCandidate} is eligible for compaction
    * in the current iteration. A policy may implement this method to skip
    * compacting intervals or segments that do not fulfil some required 
criteria.
+   *
+   * @return {@link Eligibility#OK} only if eligible.
    */
-  boolean isEligibleForCompaction(
+  Eligibility checkEligibilityForCompaction(
       CompactionCandidate candidate,
-      CompactionStatus currentCompactionStatus,
       CompactionTaskStatus latestTaskStatus
   );
+
+  /**
+   * Describes the eligibility of an interval for compaction.
+   */
+  class Eligibility
+  {
+    public static final Eligibility OK = new Eligibility(true, null);
+
+    private final boolean eligible;
+    private final String reason;
+
+    private Eligibility(boolean eligible, String reason)
+    {
+      this.eligible = eligible;
+      this.reason = reason;
+    }
+
+    public boolean isEligible()
+    {
+      return eligible;
+    }
+
+    public String getReason()
+    {
+      return reason;
+    }
+
+    public static Eligibility fail(String messageFormat, Object... args)
+    {
+      return new Eligibility(false, StringUtils.format(messageFormat, args));
+    }
+
+    @Override
+    public boolean equals(Object object)
+    {
+      if (this == object) {
+        return true;
+      }
+      if (object == null || getClass() != object.getClass()) {
+        return false;
+      }
+      Eligibility that = (Eligibility) object;
+      return eligible == that.eligible && Objects.equals(reason, that.reason);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(eligible, reason);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Eligibility{" +
+             "eligible=" + eligible +
+             ", reason='" + reason + '\'' +
+             '}';
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
index d7e51655861..7d43a09aed8 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
@@ -65,4 +65,14 @@ public class CompactionStatistics
     numIntervals -= other.getNumIntervals();
     numSegments -= other.getNumSegments();
   }
+
+  @Override
+  public String toString()
+  {
+    return "CompactionStatistics{" +
+           "totalBytes=" + totalBytes +
+           ", numSegments=" + numSegments +
+           ", numIntervals=" + numIntervals +
+           '}';
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index be4acd00e21..cc52513b16c 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -36,11 +36,17 @@ import 
org.apache.druid.segment.transform.CompactionTransformSpec;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -49,7 +55,7 @@ import java.util.stream.Collectors;
  */
 public class CompactionStatus
 {
-  private static final CompactionStatus COMPLETE = new 
CompactionStatus(State.COMPLETE, null);
+  private static final CompactionStatus COMPLETE = new 
CompactionStatus(State.COMPLETE, null, null, null);
 
   public enum State
   {
@@ -62,9 +68,7 @@ public class CompactionStatus
    * The order of the checks must be honored while evaluating them.
    */
   private static final List<Function<Evaluator, CompactionStatus>> CHECKS = 
Arrays.asList(
-      Evaluator::inputBytesAreWithinLimit,
       Evaluator::segmentsHaveBeenCompactedAtLeastOnce,
-      Evaluator::allCandidatesHaveSameCompactionState,
       Evaluator::partitionsSpecIsUpToDate,
       Evaluator::indexSpecIsUpToDate,
       Evaluator::segmentGranularityIsUpToDate,
@@ -78,11 +82,20 @@ public class CompactionStatus
 
   private final State state;
   private final String reason;
-
-  private CompactionStatus(State state, String reason)
+  private final CompactionStatistics compactedStats;
+  private final CompactionStatistics uncompactedStats;
+
+  private CompactionStatus(
+      State state,
+      String reason,
+      CompactionStatistics compactedStats,
+      CompactionStatistics uncompactedStats
+  )
   {
     this.state = state;
     this.reason = reason;
+    this.compactedStats = compactedStats;
+    this.uncompactedStats = uncompactedStats;
   }
 
   public boolean isComplete()
@@ -105,18 +118,45 @@ public class CompactionStatus
     return state;
   }
 
+  public CompactionStatistics getCompactedStats()
+  {
+    return compactedStats;
+  }
+
+  public CompactionStatistics getUncompactedStats()
+  {
+    return uncompactedStats;
+  }
+
   @Override
   public String toString()
   {
     return "CompactionStatus{" +
            "state=" + state +
            ", reason=" + reason +
+           ", compactedStats=" + compactedStats +
+           ", uncompactedStats=" + uncompactedStats +
            '}';
   }
 
   public static CompactionStatus pending(String reasonFormat, Object... args)
   {
-    return new CompactionStatus(State.PENDING, 
StringUtils.format(reasonFormat, args));
+    return new CompactionStatus(State.PENDING, 
StringUtils.format(reasonFormat, args), null, null);
+  }
+
+  public static CompactionStatus pending(
+      CompactionStatistics compactedStats,
+      CompactionStatistics uncompactedStats,
+      String reasonFormat,
+      Object... args
+  )
+  {
+    return new CompactionStatus(
+        State.PENDING,
+        StringUtils.format(reasonFormat, args),
+        compactedStats,
+        uncompactedStats
+    );
   }
 
   /**
@@ -193,34 +233,26 @@ public class CompactionStatus
 
   public static CompactionStatus skipped(String reasonFormat, Object... args)
   {
-    return new CompactionStatus(State.SKIPPED, 
StringUtils.format(reasonFormat, args));
+    return new CompactionStatus(State.SKIPPED, 
StringUtils.format(reasonFormat, args), null, null);
   }
 
   public static CompactionStatus running(String message)
   {
-    return new CompactionStatus(State.RUNNING, message);
-  }
-
-  public static CompactionStatus complete(String message)
-  {
-    return new CompactionStatus(State.COMPLETE, message);
+    return new CompactionStatus(State.RUNNING, message, null, null);
   }
 
   /**
    * Determines the CompactionStatus of the given candidate segments by 
evaluating
    * the {@link #CHECKS} one by one. If any check returns an incomplete status,
-   * further checks are not performed and the incomplete status is returned.
+   * further checks are still performed to determine the number of uncompacted
+   * segments but only the first incomplete status is returned.
    */
   static CompactionStatus compute(
       CompactionCandidate candidateSegments,
       DataSourceCompactionConfig config
   )
   {
-    final Evaluator evaluator = new Evaluator(candidateSegments, config);
-    return CHECKS.stream()
-                 .map(f -> f.apply(evaluator))
-                 .filter(status -> !status.isComplete())
-                 .findFirst().orElse(COMPLETE);
+    return new Evaluator(candidateSegments, config).evaluate();
   }
 
   @Nullable
@@ -288,58 +320,127 @@ public class CompactionStatus
   }
 
   /**
-   * Evaluates {@link #CHECKS} to determine the compaction status.
+   * Evaluates {@link #CHECKS} to determine the compaction status of a
+   * {@link CompactionCandidate}.
    */
   private static class Evaluator
   {
     private final DataSourceCompactionConfig compactionConfig;
     private final CompactionCandidate candidateSegments;
-    private final CompactionState lastCompactionState;
     private final ClientCompactionTaskQueryTuningConfig tuningConfig;
-    private final UserCompactionTaskGranularityConfig existingGranularitySpec;
     private final UserCompactionTaskGranularityConfig 
configuredGranularitySpec;
 
+    private final List<DataSegment> uncompactedSegments = new ArrayList<>();
+    private final Map<CompactionState, List<DataSegment>> 
unknownStateToSegments = new HashMap<>();
+
     private Evaluator(
         CompactionCandidate candidateSegments,
         DataSourceCompactionConfig compactionConfig
     )
     {
       this.candidateSegments = candidateSegments;
-      this.lastCompactionState = 
candidateSegments.getSegments().get(0).getLastCompactionState();
       this.compactionConfig = compactionConfig;
       this.tuningConfig = 
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
       this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
-      if (lastCompactionState == null) {
-        this.existingGranularitySpec = null;
+    }
+
+    private CompactionStatus evaluate()
+    {
+      final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit();
+      if (inputBytesCheck.isSkipped()) {
+        return inputBytesCheck;
+      }
+
+      final List<String> reasonsForCompaction =
+          CHECKS.stream()
+                .map(f -> f.apply(this))
+                .filter(status -> !status.isComplete())
+                .map(CompactionStatus::getReason)
+                .collect(Collectors.toList());
+
+      // Consider segments which have passed all checks to be compacted
+      final List<DataSegment> compactedSegments = unknownStateToSegments
+          .values()
+          .stream()
+          .flatMap(List::stream)
+          .collect(Collectors.toList());
+
+      if (reasonsForCompaction.isEmpty()) {
+        return COMPLETE;
       } else {
-        this.existingGranularitySpec = 
UserCompactionTaskGranularityConfig.from(
-            lastCompactionState.getGranularitySpec()
+        return CompactionStatus.pending(
+            createStats(compactedSegments),
+            createStats(uncompactedSegments),
+            reasonsForCompaction.get(0)
         );
       }
     }
 
     private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
     {
-      if (lastCompactionState == null) {
-        return CompactionStatus.pending("not compacted yet");
-      } else {
-        return COMPLETE;
+      // Identify the compaction states of all the segments
+      for (DataSegment segment : candidateSegments.getSegments()) {
+        final CompactionState segmentState = segment.getLastCompactionState();
+        if (segmentState == null) {
+          uncompactedSegments.add(segment);
+        } else {
+          unknownStateToSegments.computeIfAbsent(segmentState, s -> new 
ArrayList<>()).add(segment);
+        }
       }
-    }
 
-    private CompactionStatus allCandidatesHaveSameCompactionState()
-    {
-      final boolean allHaveSameCompactionState = 
candidateSegments.getSegments().stream().allMatch(
-          segment -> 
lastCompactionState.equals(segment.getLastCompactionState())
-      );
-      if (allHaveSameCompactionState) {
+      if (uncompactedSegments.isEmpty()) {
         return COMPLETE;
       } else {
-        return CompactionStatus.pending("segments have different last 
compaction states");
+        return CompactionStatus.pending("not compacted yet");
       }
     }
 
     private CompactionStatus partitionsSpecIsUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::partitionsSpecIsUpToDate);
+    }
+
+    private CompactionStatus indexSpecIsUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::indexSpecIsUpToDate);
+    }
+
+    private CompactionStatus projectionsAreUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::projectionsAreUpToDate);
+    }
+
+    private CompactionStatus segmentGranularityIsUpToDate()
+    {
+      return 
evaluateForAllCompactionStates(this::segmentGranularityIsUpToDate);
+    }
+
+    private CompactionStatus rollupIsUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::rollupIsUpToDate);
+    }
+
+    private CompactionStatus queryGranularityIsUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::queryGranularityIsUpToDate);
+    }
+
+    private CompactionStatus dimensionsSpecIsUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::dimensionsSpecIsUpToDate);
+    }
+
+    private CompactionStatus metricsSpecIsUpToDate()
+    {
+      return evaluateForAllCompactionStates(this::metricsSpecIsUpToDate);
+    }
+
+    private CompactionStatus transformSpecFilterIsUpToDate()
+    {
+      return 
evaluateForAllCompactionStates(this::transformSpecFilterIsUpToDate);
+    }
+
+    private CompactionStatus partitionsSpecIsUpToDate(CompactionState 
lastCompactionState)
     {
       PartitionsSpec existingPartionsSpec = 
lastCompactionState.getPartitionsSpec();
       if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) {
@@ -357,7 +458,7 @@ public class CompactionStatus
       );
     }
 
-    private CompactionStatus indexSpecIsUpToDate()
+    private CompactionStatus indexSpecIsUpToDate(CompactionState 
lastCompactionState)
     {
       return CompactionStatus.completeIfNullOrEqual(
           "indexSpec",
@@ -367,7 +468,7 @@ public class CompactionStatus
       );
     }
 
-    private CompactionStatus projectionsAreUpToDate()
+    private CompactionStatus projectionsAreUpToDate(CompactionState 
lastCompactionState)
     {
       return CompactionStatus.completeIfNullOrEqual(
           "projections",
@@ -390,7 +491,7 @@ public class CompactionStatus
       }
     }
 
-    private CompactionStatus segmentGranularityIsUpToDate()
+    private CompactionStatus segmentGranularityIsUpToDate(CompactionState 
lastCompactionState)
     {
       if (configuredGranularitySpec == null
           || configuredGranularitySpec.getSegmentGranularity() == null) {
@@ -398,6 +499,7 @@ public class CompactionStatus
       }
 
       final Granularity configuredSegmentGranularity = 
configuredGranularitySpec.getSegmentGranularity();
+      final UserCompactionTaskGranularityConfig existingGranularitySpec = 
getGranularitySpec(lastCompactionState);
       final Granularity existingSegmentGranularity
           = existingGranularitySpec == null ? null : 
existingGranularitySpec.getSegmentGranularity();
 
@@ -406,7 +508,8 @@ public class CompactionStatus
       } else if (existingSegmentGranularity == null) {
         // Candidate segments were compacted without segment granularity 
specified
         // Check if the segments already have the desired segment granularity
-        boolean needsCompaction = 
candidateSegments.getSegments().stream().anyMatch(
+        final List<DataSegment> segmentsForState = 
unknownStateToSegments.get(lastCompactionState);
+        boolean needsCompaction = segmentsForState.stream().anyMatch(
             segment -> 
!configuredSegmentGranularity.isAligned(segment.getInterval())
         );
         if (needsCompaction) {
@@ -427,11 +530,13 @@ public class CompactionStatus
       return COMPLETE;
     }
 
-    private CompactionStatus rollupIsUpToDate()
+    private CompactionStatus rollupIsUpToDate(CompactionState 
lastCompactionState)
     {
       if (configuredGranularitySpec == null) {
         return COMPLETE;
       } else {
+        final UserCompactionTaskGranularityConfig existingGranularitySpec
+            = getGranularitySpec(lastCompactionState);
         return CompactionStatus.completeIfNullOrEqual(
             "rollup",
             configuredGranularitySpec.isRollup(),
@@ -441,11 +546,13 @@ public class CompactionStatus
       }
     }
 
-    private CompactionStatus queryGranularityIsUpToDate()
+    private CompactionStatus queryGranularityIsUpToDate(CompactionState 
lastCompactionState)
     {
       if (configuredGranularitySpec == null) {
         return COMPLETE;
       } else {
+        final UserCompactionTaskGranularityConfig existingGranularitySpec
+            = getGranularitySpec(lastCompactionState);
         return CompactionStatus.completeIfNullOrEqual(
             "queryGranularity",
             configuredGranularitySpec.getQueryGranularity(),
@@ -460,7 +567,7 @@ public class CompactionStatus
      * which can create a mismatch between expected and actual order of 
dimensions. Partition dimensions are separately
      * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check.
      */
-    private CompactionStatus dimensionsSpecIsUpToDate()
+    private CompactionStatus dimensionsSpecIsUpToDate(CompactionState 
lastCompactionState)
     {
       if (compactionConfig.getDimensionsSpec() == null) {
         return COMPLETE;
@@ -488,7 +595,7 @@ public class CompactionStatus
       }
     }
 
-    private CompactionStatus metricsSpecIsUpToDate()
+    private CompactionStatus metricsSpecIsUpToDate(CompactionState 
lastCompactionState)
     {
       final AggregatorFactory[] configuredMetricsSpec = 
compactionConfig.getMetricsSpec();
       if (ArrayUtils.isEmpty(configuredMetricsSpec)) {
@@ -512,7 +619,7 @@ public class CompactionStatus
       }
     }
 
-    private CompactionStatus transformSpecFilterIsUpToDate()
+    private CompactionStatus transformSpecFilterIsUpToDate(CompactionState 
lastCompactionState)
     {
       if (compactionConfig.getTransformSpec() == null) {
         return COMPLETE;
@@ -526,5 +633,45 @@ public class CompactionStatus
           String::valueOf
       );
     }
+
+    /**
+     * Evaluates the given check for each entry in the {@link 
#unknownStateToSegments}.
+     * If any entry fails the given check by returning a status which is not
+     * COMPLETE, all the segments with that state are moved to {@link 
#uncompactedSegments}.
+     *
+     * @return The first status which is not COMPLETE.
+     */
+    private CompactionStatus evaluateForAllCompactionStates(
+        Function<CompactionState, CompactionStatus> check
+    )
+    {
+      CompactionStatus firstIncompleteStatus = null;
+      for (CompactionState state : 
List.copyOf(unknownStateToSegments.keySet())) {
+        final CompactionStatus status = check.apply(state);
+        if (!status.isComplete()) {
+          uncompactedSegments.addAll(unknownStateToSegments.remove(state));
+          if (firstIncompleteStatus == null) {
+            firstIncompleteStatus = status;
+          }
+        }
+      }
+
+      return firstIncompleteStatus == null ? COMPLETE : firstIncompleteStatus;
+    }
+
+    private static UserCompactionTaskGranularityConfig getGranularitySpec(
+        CompactionState compactionState
+    )
+    {
+      return 
UserCompactionTaskGranularityConfig.from(compactionState.getGranularitySpec());
+    }
+
+    private static CompactionStatistics createStats(List<DataSegment> segments)
+    {
+      final Set<Interval> segmentIntervals =
+          
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
+      final long totalBytes = 
segments.stream().mapToLong(DataSegment::getSize).sum();
+      return CompactionStatistics.create(totalBytes, segments.size(), 
segmentIntervals.size());
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
index 401f413e7fd..1dc409e7361 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
@@ -96,17 +96,19 @@ public class CompactionStatusTracker
     if (lastTaskStatus != null
         && lastTaskStatus.getState() == TaskState.SUCCESS
         && snapshotTime != null && 
snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) {
-      return CompactionStatus.complete(
+      return CompactionStatus.skipped(
           "Segment timeline not updated since last compaction task succeeded"
       );
     }
 
     // Skip intervals that have been filtered out by the policy
-    if (!searchPolicy.isEligibleForCompaction(candidate, 
CompactionStatus.pending(""), lastTaskStatus)) {
-      return CompactionStatus.skipped("Rejected by search policy");
+    final CompactionCandidateSearchPolicy.Eligibility eligibility
+        = searchPolicy.checkEligibilityForCompaction(candidate, 
lastTaskStatus);
+    if (eligibility.isEligible()) {
+      return CompactionStatus.pending("Not compacted yet");
+    } else {
+      return CompactionStatus.skipped("Rejected by search policy: %s", 
eligibility.getReason());
     }
-
-    return CompactionStatus.pending("Not compacted yet");
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
 
b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
index 3e8726471b1..24a2f001afe 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
@@ -56,13 +56,14 @@ public class FixedIntervalOrderPolicy implements 
CompactionCandidateSearchPolicy
   }
 
   @Override
-  public boolean isEligibleForCompaction(
+  public Eligibility checkEligibilityForCompaction(
       CompactionCandidate candidate,
-      CompactionStatus currentCompactionStatus,
       CompactionTaskStatus latestTaskStatus
   )
   {
-    return findIndex(candidate) < Integer.MAX_VALUE;
+    return findIndex(candidate) < Integer.MAX_VALUE
+        ? Eligibility.OK
+        : Eligibility.fail("Datasource/Interval is not in the list of 
'eligibleCandidates'");
   }
 
   private int findIndex(CompactionCandidate candidate)
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
 
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
new file mode 100644
index 00000000000..38e534c8273
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+
+/**
+ * Experimental {@link CompactionCandidateSearchPolicy} which prioritizes 
compaction
+ * of intervals with the largest number of small uncompacted segments.
+ * <p>
+ * This policy favors cluster stability (by prioritizing reduction of segment
+ * count) over performance of queries on newer intervals. For the latter, use
+ * {@link NewestSegmentFirstPolicy}.
+ */
+@UnstableApi
+public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
+{
+  private static final HumanReadableBytes SIZE_2_GB = new 
HumanReadableBytes("2GiB");
+  private static final HumanReadableBytes SIZE_10_MB = new 
HumanReadableBytes("10MiB");
+
+  private final int minUncompactedCount;
+  private final HumanReadableBytes minUncompactedBytes;
+  private final HumanReadableBytes maxAverageUncompactedBytesPerSegment;
+
+  @JsonCreator
+  public MostFragmentedIntervalFirstPolicy(
+      @JsonProperty("minUncompactedCount") @Nullable Integer 
minUncompactedCount,
+      @JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes 
minUncompactedBytes,
+      @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable
+      HumanReadableBytes maxAverageUncompactedBytesPerSegment,
+      @JsonProperty("priorityDatasource") @Nullable String priorityDatasource
+  )
+  {
+    super(priorityDatasource);
+
+    InvalidInput.conditionalException(
+        minUncompactedCount == null || minUncompactedCount > 0,
+        "'minUncompactedCount'[%s] must be greater than 0",
+        minUncompactedCount
+    );
+    InvalidInput.conditionalException(
+        maxAverageUncompactedBytesPerSegment == null || 
maxAverageUncompactedBytesPerSegment.getBytes() > 0,
+        "'minUncompactedCount'[%s] must be greater than 0",
+        maxAverageUncompactedBytesPerSegment
+    );
+
+    this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount, 
100);
+    this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes, 
SIZE_10_MB);
+    this.maxAverageUncompactedBytesPerSegment
+        = Configs.valueOrDefault(maxAverageUncompactedBytesPerSegment, 
SIZE_2_GB);
+  }
+
+  /**
+   * Minimum number of uncompacted segments that must be present in an interval
+   * to make it eligible for compaction.
+   */
+  @JsonProperty
+  public int getMinUncompactedCount()
+  {
+    return minUncompactedCount;
+  }
+
+  /**
+   * Minimum total bytes of uncompacted segments that must be present in an
+   * interval to make it eligible for compaction. Default value is {@link 
#SIZE_10_MB}.
+   */
+  @JsonProperty
+  public HumanReadableBytes getMinUncompactedBytes()
+  {
+    return minUncompactedBytes;
+  }
+
+  /**
+   * Maximum average size of uncompacted segments in an interval eligible for
+   * compaction. Default value is {@link #SIZE_2_GB}.
+   */
+  @JsonProperty
+  public HumanReadableBytes getMaxAverageUncompactedBytesPerSegment()
+  {
+    return maxAverageUncompactedBytesPerSegment;
+  }
+
+  @Override
+  protected Comparator<CompactionCandidate> getSegmentComparator()
+  {
+    return this::compare;
+  }
+
+  private int compare(CompactionCandidate candidateA, CompactionCandidate 
candidateB)
+  {
+    final double fragmentationDiff
+        = computeFragmentationIndex(candidateB) - 
computeFragmentationIndex(candidateA);
+    return (int) fragmentationDiff;
+  }
+
+  @Override
+  public Eligibility checkEligibilityForCompaction(
+      CompactionCandidate candidate,
+      CompactionTaskStatus latestTaskStatus
+  )
+  {
+    final CompactionStatistics uncompacted = candidate.getUncompactedStats();
+    if (uncompacted == null) {
+      return Eligibility.OK;
+    } else if (uncompacted.getNumSegments() < 1) {
+      return Eligibility.fail("No uncompacted segments in interval");
+    } else if (uncompacted.getNumSegments() < minUncompactedCount) {
+      return Eligibility.fail(
+          "Uncompacted segments[%,d] in interval must be at least [%,d]",
+          uncompacted.getNumSegments(), minUncompactedCount
+      );
+    } else if (uncompacted.getTotalBytes() < minUncompactedBytes.getBytes()) {
+      return Eligibility.fail(
+          "Uncompacted bytes[%,d] in interval must be at least [%,d]",
+          uncompacted.getTotalBytes(), minUncompactedBytes.getBytes()
+      );
+    }
+
+    final long avgSegmentSize = (uncompacted.getTotalBytes() / 
uncompacted.getNumSegments());
+    if (avgSegmentSize > maxAverageUncompactedBytesPerSegment.getBytes()) {
+      return Eligibility.fail(
+          "Average size[%,d] of uncompacted segments in interval must be at 
most [%,d]",
+          avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes()
+      );
+    } else {
+      return Eligibility.OK;
+    }
+  }
+
+  /**
+   * Computes the degree of fragmentation in the interval of the given 
compaction
+   * candidate. Calculated as the number of uncompacted segments plus an 
additional
+   * term that captures the "smallness" of segments in that interval.
+   * A higher fragmentation index causes the candidate to be higher in priority
+   * for compaction.
+   */
+  private double computeFragmentationIndex(CompactionCandidate candidate)
+  {
+    final CompactionStatistics uncompacted = candidate.getUncompactedStats();
+    if (uncompacted == null || uncompacted.getNumSegments() < 1 || 
uncompacted.getTotalBytes() < 1) {
+      return 0;
+    }
+
+    final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() / 
uncompacted.getNumSegments());
+
+    // Fragmentation index increases as uncompacted segment count increases
+    double segmentCountTerm = uncompacted.getNumSegments();
+
+    // Fragmentation index increases as avg uncompacted segment size decreases
+    double segmentSizeTerm =
+        (1.0f * minUncompactedCount * 
maxAverageUncompactedBytesPerSegment.getBytes()) / avgUncompactedSize;
+
+    return segmentCountTerm + segmentSizeTerm;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
index c0496b0705c..1314a1a0bc7 100644
--- 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
@@ -120,7 +120,7 @@ public class CompactionStatusTrackerTest
     statusTracker.onTaskFinished("task1", TaskStatus.success("task1"));
 
     status = statusTracker.computeCompactionStatus(candidateSegments, policy);
-    Assert.assertEquals(CompactionStatus.State.COMPLETE, status.getState());
+    Assert.assertEquals(CompactionStatus.State.SKIPPED, status.getState());
     Assert.assertEquals(
         "Segment timeline not updated since last compaction task succeeded",
         status.getReason()
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
new file mode 100644
index 00000000000..594fe91020b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.List;
+
+public class MostFragmentedIntervalFirstPolicyTest
+{
+  private static final DataSegment SEGMENT =
+      
CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100).get(0);
+
+  @Test
+  public void test_thresholdValues_ofDefaultPolicy()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(null, null, null, null);
+    Assertions.assertEquals(100, policy.getMinUncompactedCount());
+    Assertions.assertEquals(new HumanReadableBytes("10MiB"), 
policy.getMinUncompactedBytes());
+    Assertions.assertEquals(new HumanReadableBytes("2GiB"), 
policy.getMaxAverageUncompactedBytesPerSegment());
+    Assertions.assertNull(policy.getPriorityDatasource());
+  }
+
+  @Test
+  public void 
test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanCutoff()
+  {
+    final int minUncompactedCount = 10_000;
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        minUncompactedCount,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(10_000),
+        null
+    );
+
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.fail(
+            "Uncompacted segments[1] in interval must be at least [10,000]"
+        ),
+        policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
+    );
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.OK,
+        policy.checkEligibilityForCompaction(createCandidate(10_001, 100L), 
null)
+    );
+  }
+
+  @Test
+  public void 
test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanCutoff()
+  {
+    final HumanReadableBytes minUncompactedBytes = 
HumanReadableBytes.valueOf(10_000);
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        minUncompactedBytes,
+        HumanReadableBytes.valueOf(10_000),
+        null
+    );
+
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.fail(
+            "Uncompacted bytes[100] in interval must be at least [10,000]"
+        ),
+        policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
+    );
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.OK,
+        policy.checkEligibilityForCompaction(createCandidate(100, 10_000L), 
null)
+    );
+  }
+
+  @Test
+  public void 
test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThanCutoff()
+  {
+    final HumanReadableBytes maxAvgSegmentSize = 
HumanReadableBytes.valueOf(100);
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        HumanReadableBytes.valueOf(100),
+        maxAvgSegmentSize,
+        null
+    );
+
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.fail(
+            "Average size[10,000] of uncompacted segments in interval must be 
at most [100]"
+        ),
+        policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null)
+    );
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.OK,
+        policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
+    );
+  }
+
+  @Test
+  public void 
test_policy_favorsIntervalWithMoreUncompactedSegments_ifTotalBytesIsEqual()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(10_000),
+        null
+    );
+
+    final CompactionCandidate candidateA = createCandidate(1, 1000L);
+    final CompactionCandidate candidateB = createCandidate(2, 500L);
+
+    verifyCandidateIsEligible(candidateA, policy);
+    verifyCandidateIsEligible(candidateB, policy);
+
+    Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 
0);
+    Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 
0);
+  }
+
+  @Test
+  public void 
test_policy_favorsIntervalWithMoreUncompactedSegments_ifAverageSizeIsEqual()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(10_000),
+        null
+    );
+
+    final CompactionCandidate candidateA = createCandidate(1, 1000L);
+    final CompactionCandidate candidateB = createCandidate(2, 1000L);
+
+    verifyCandidateIsEligible(candidateA, policy);
+    verifyCandidateIsEligible(candidateB, policy);
+
+    Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 
0);
+    Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 
0);
+  }
+
+  @Test
+  public void test_policy_favorsIntervalWithSmallerSegments_ifCountIsEqual()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(10_000),
+        null
+    );
+
+    final CompactionCandidate candidateA = createCandidate(10, 500L);
+    final CompactionCandidate candidateB = createCandidate(10, 1000L);
+
+    verifyCandidateIsEligible(candidateA, policy);
+    verifyCandidateIsEligible(candidateB, policy);
+
+    Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) < 
0);
+    Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) > 
0);
+  }
+
+  @Test
+  public void 
test_compareCandidates_returnsZeroIfSegmentCountAndAvgSizeScaleEquivalently()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        100,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(100),
+        null
+    );
+
+    final CompactionCandidate candidateA = createCandidate(100, 25);
+    final CompactionCandidate candidateB = createCandidate(400, 100);
+
+    verifyCandidateIsEligible(candidateA, policy);
+    verifyCandidateIsEligible(candidateB, policy);
+
+    Assertions.assertEquals(0, policy.compareCandidates(candidateA, 
candidateB));
+    Assertions.assertEquals(0, policy.compareCandidates(candidateB, 
candidateA));
+  }
+
+  private CompactionCandidate createCandidate(int numSegments, long 
avgSizeBytes)
+  {
+    final CompactionStatistics dummyCompactedStats = 
CompactionStatistics.create(1L, 1L, 1L);
+    final CompactionStatistics uncompactedStats = CompactionStatistics.create(
+        avgSizeBytes * numSegments,
+        numSegments,
+        1L
+    );
+    return CompactionCandidate.from(List.of(SEGMENT), null)
+        .withCurrentStatus(CompactionStatus.pending(dummyCompactedStats, 
uncompactedStats, ""));
+  }
+
+  private void verifyCandidateIsEligible(CompactionCandidate candidate, 
MostFragmentedIntervalFirstPolicy policy)
+  {
+    Assertions.assertEquals(
+        CompactionCandidateSearchPolicy.Eligibility.OK,
+        policy.checkEligibilityForCompaction(candidate, null)
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to