kfaraz commented on code in PR #18968:
URL: https://github.com/apache/druid/pull/18968#discussion_r2762688447


##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -160,21 +169,122 @@ public CompactionStatus getCurrentStatus()
     return currentStatus;
   }
 
+  @Nullable
+  public CompactionCandidateSearchPolicy.Eligibility getPolicyEligibility()
+  {
+    return policyEligiblity;
+  }
+
   /**
    * Creates a copy of this CompactionCandidate object with the given status.
    */
   public CompactionCandidate withCurrentStatus(CompactionStatus status)
   {
-    return new CompactionCandidate(segments, umbrellaInterval, 
compactionInterval, numIntervals, status);
+    return new CompactionCandidate(
+        segments,
+        umbrellaInterval,
+        compactionInterval,
+        numIntervals,
+        policyEligiblity,
+        status
+    );
+  }
+
+  public CompactionCandidate 
withPolicyEligibility(CompactionCandidateSearchPolicy.Eligibility eligibility)
+  {
+    return new CompactionCandidate(
+        segments,
+        umbrellaInterval,
+        compactionInterval,
+        numIntervals,
+        eligibility,
+        currentStatus
+    );
+  }
+
+  /**
+   * Evaluates this candidate for compaction eligibility based on the provided
+   * compaction configuration and search policy.
+   * <p>
+   * This method first evaluates the candidate against the compaction 
configuration
+   * using a {@link CompactionStatus.Evaluator} to determine if any segments 
need
+   * compaction. If segments are pending compaction, the search policy is 
consulted
+   * to determine the type of compaction:
+   * <ul>
+   * <li><b>NOT_ELIGIBLE</b>: Returns a candidate with status SKIPPED, 
indicating
+   *     the policy decided compaction should not occur at this time</li>
+   * <li><b>FULL_COMPACTION</b>: Returns this candidate with status PENDING,
+   *     indicating all segments should be compacted</li>
+   * <li><b>INCREMENTAL_COMPACTION</b>: Returns a new candidate containing only
+   *     the uncompacted segments (as determined by the evaluator), with status
+   *     PENDING for incremental compaction</li>
+   * </ul>
+   *
+   * @param config       the compaction configuration for the datasource
+   * @param searchPolicy the policy used to determine compaction eligibility
+   * @return a CompactionCandidate with updated status and potentially 
filtered segments
+   */
+  public CompactionCandidate evaluate(

Review Comment:
   This method should not live here. The computation of the eligiblity should 
be done in `CompactionStatusTracker`.
   
   The overall workflow would be like this:
   - identify a `CompactionCandidate`
   - determine its `CompactionStatus`
   - determine the eligiblity via `CompactionStatusTracker.computeEligibility()`



##########
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java:
##########
@@ -38,18 +42,37 @@ public class ClientCompactionIntervalSpec
 
   private final Interval interval;
   @Nullable
+  private final List<SegmentDescriptor> uncompactedSegments;
+  @Nullable
   private final String sha256OfSortedSegmentIds;
 
   @JsonCreator
   public ClientCompactionIntervalSpec(
       @JsonProperty("interval") Interval interval,
+      @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor> 
uncompactedSegments,

Review Comment:
   It is a little confusing to add segments here since there is already a 
`SpecificSegmentsSpec` which specifies the set of segments that should be 
compacted.
   
   I feel we should add a new type of `CompactionInputSpec` which will be used 
for incremental compaction only. It would have both a non-null interval as well 
as the set of segment IDs to compact incrementally.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction(
    */
   class Eligibility
   {
-    public static final Eligibility OK = new Eligibility(true, null);
+    public enum PolicyEligibility

Review Comment:
   Maybe we should call this enum `Status` (since this represents the 
eligibility status) to avoid confusion with `Eligibility` class itself.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction(
    */
   class Eligibility

Review Comment:
   Since this class is going to have a wider usage now, let's move it out into 
a separate file of its own.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -357,29 +348,52 @@ private static class Evaluator
     private final Map<CompactionState, List<DataSegment>> 
unknownStateToSegments = new HashMap<>();
 
     @Nullable
-    private final String targetFingerprint;
     private final IndexingStateFingerprintMapper fingerprintMapper;
+    @Nullable
+    private final String targetFingerprint;
 
-    private Evaluator(
+    Evaluator(
         CompactionCandidate candidateSegments,
         DataSourceCompactionConfig compactionConfig,
-        @Nullable String targetFingerprint,
         @Nullable IndexingStateFingerprintMapper fingerprintMapper
     )
     {
       this.candidateSegments = candidateSegments;
       this.compactionConfig = compactionConfig;
       this.tuningConfig = 
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
       this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
-      this.targetFingerprint = targetFingerprint;
       this.fingerprintMapper = fingerprintMapper;
+      if (fingerprintMapper == null) {
+        targetFingerprint = null;
+      } else {
+        targetFingerprint = fingerprintMapper.generateFingerprint(
+            compactionConfig.getDataSource(),
+            compactionConfig.toCompactionState()
+        );
+      }
+    }
+
+    List<DataSegment> getUncompactedSegments()
+    {
+      return uncompactedSegments;
     }
 
-    private CompactionStatus evaluate()
+    /**
+     * Evaluates the compaction status of candidate segments through a 
multi-step process:
+     * <ol>
+     *   <li>Validates input bytes are within limits</li>
+     *   <li>Categorizes segments by compaction state (fingerprinted, 
uncompacted, or unknown)</li>
+     *   <li>Performs fingerprint-based validation if available (fast 
path)</li>
+     *   <li>Runs detailed checks against unknown states via {@link 
#CHECKS}</li>
+     * </ol>
+     *
+     * @return Pair of eligibility status and compaction status with reason 
for first failed check
+     */
+    Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus> 
evaluate()
     {
-      final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit();
-      if (inputBytesCheck.isSkipped()) {
-        return inputBytesCheck;
+      final CompactionCandidateSearchPolicy.Eligibility inputBytesCheck = 
inputBytesAreWithinLimit();
+      if (inputBytesCheck != null) {

Review Comment:
   I think the input bytes check should now be moved out of this class and used 
in `CompactionStatusTracker.computeEligibility()` instead.



##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -329,17 +331,17 @@ private void 
findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti
         continue;
       }
 
-      final CompactionCandidate candidates = 
CompactionCandidate.from(segments, config.getSegmentGranularity());
-      final CompactionStatus compactionStatus = 
CompactionStatus.compute(candidates, config, fingerprintMapper);
-      final CompactionCandidate candidatesWithStatus = 
candidates.withCurrentStatus(compactionStatus);
+      final CompactionCandidate candidatesWithStatus =

Review Comment:
   I feel this class should remain untouched since we are only computing the 
`CompactionStatus` here. The eligiblity computation should happen later.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction(
    */
   class Eligibility
   {
-    public static final Eligibility OK = new Eligibility(true, null);
+    public enum PolicyEligibility
+    {
+      FULL_COMPACTION,
+      INCREMENTAL_COMPACTION,
+      NOT_ELIGIBLE,
+      NOT_APPLICABLE

Review Comment:
   How is "not applicable" different from "not eligible"?
   I think eligibility should have only 3 possible states - not eligible, 
incremental and full.



##########
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java:
##########
@@ -185,8 +215,18 @@ public Eligibility checkEligibilityForCompaction(
           "Average size[%,d] of uncompacted segments in interval must be at 
most [%,d]",
           avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes()
       );
+    }
+
+    final double uncompactedBytesRatio = (double) uncompacted.getTotalBytes() /
+                                         (uncompacted.getTotalBytes() + 
candidate.getCompactedStats().getTotalBytes());
+    if (uncompactedBytesRatio < 
incrementalCompactionUncompactedBytesRatioThreshold) {
+      return Eligibility.incrementalCompaction(

Review Comment:
   Let's not add this config or return incremental compaction here since 
incremental compaction is not implemented yet. If a user configures this field 
by mistake, compaction might misbehave.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -357,29 +348,52 @@ private static class Evaluator
     private final Map<CompactionState, List<DataSegment>> 
unknownStateToSegments = new HashMap<>();
 
     @Nullable
-    private final String targetFingerprint;
     private final IndexingStateFingerprintMapper fingerprintMapper;
+    @Nullable
+    private final String targetFingerprint;
 
-    private Evaluator(
+    Evaluator(
         CompactionCandidate candidateSegments,
         DataSourceCompactionConfig compactionConfig,
-        @Nullable String targetFingerprint,
         @Nullable IndexingStateFingerprintMapper fingerprintMapper
     )
     {
       this.candidateSegments = candidateSegments;
       this.compactionConfig = compactionConfig;
       this.tuningConfig = 
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
       this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
-      this.targetFingerprint = targetFingerprint;
       this.fingerprintMapper = fingerprintMapper;
+      if (fingerprintMapper == null) {
+        targetFingerprint = null;
+      } else {
+        targetFingerprint = fingerprintMapper.generateFingerprint(
+            compactionConfig.getDataSource(),
+            compactionConfig.toCompactionState()
+        );
+      }
+    }
+
+    List<DataSegment> getUncompactedSegments()
+    {
+      return uncompactedSegments;
     }
 
-    private CompactionStatus evaluate()
+    /**
+     * Evaluates the compaction status of candidate segments through a 
multi-step process:
+     * <ol>
+     *   <li>Validates input bytes are within limits</li>
+     *   <li>Categorizes segments by compaction state (fingerprinted, 
uncompacted, or unknown)</li>
+     *   <li>Performs fingerprint-based validation if available (fast 
path)</li>
+     *   <li>Runs detailed checks against unknown states via {@link 
#CHECKS}</li>
+     * </ol>
+     *
+     * @return Pair of eligibility status and compaction status with reason 
for first failed check
+     */
+    Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus> 
evaluate()

Review Comment:
   Since the `CompactionStatus` and eligibility are going to be two separate 
things now, this class should not deal with eligibility at all. We can retain 
the old code in this class.
   
   Once the `CompactionStatus` has been computed, that should be passed to 
`CompactionStatusTracker.computeEligiblity()` to determine the eligibility.



##########
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java:
##########
@@ -47,13 +47,16 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
   private final int minUncompactedCount;
   private final HumanReadableBytes minUncompactedBytes;
   private final HumanReadableBytes maxAverageUncompactedBytesPerSegment;
+  private final double incrementalCompactionUncompactedBytesRatioThreshold;
 
   @JsonCreator
   public MostFragmentedIntervalFirstPolicy(
       @JsonProperty("minUncompactedCount") @Nullable Integer 
minUncompactedCount,
       @JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes 
minUncompactedBytes,
       @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable
       HumanReadableBytes maxAverageUncompactedBytesPerSegment,
+      @JsonProperty("incrementalCompactionUncompactedBytesRatioThreshold") 
@Nullable

Review Comment:
   Let's not add this user-facing config until incremental compaction has been 
implemented.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -344,7 +335,7 @@ static DimensionRangePartitionsSpec 
getEffectiveRangePartitionsSpec(DimensionRan
    * Evaluates {@link #CHECKS} to determine the compaction status of a
    * {@link CompactionCandidate}.
    */
-  private static class Evaluator
+  static class Evaluator

Review Comment:
   Let's keep the evaluator private as before.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -80,10 +80,7 @@ public Set<String> getSubmittedTaskIds()
    * This method assumes that the given candidate is eligible for compaction
    * based on the current compaction config/supervisor of the datasource.
    */
-  public CompactionStatus computeCompactionStatus(
-      CompactionCandidate candidate,
-      CompactionCandidateSearchPolicy searchPolicy
-  )
+  public CompactionStatus computeCompactionStatus(CompactionCandidate 
candidate)

Review Comment:
   Since we are now decoupling eligibility (i.e. whether or not to run 
compaction) from the current compaction status (i.e. current degree of 
compaction in the interval), it would make sense to do the following:
   - Repurpose this method to compute the eligibility since the eligibility can 
be affected by the `lastTaskStatus` but the `CompactionStatus` should not 
depend on it.
   - `CompactionStatus` need not have the `RUNNING` state anymore since it is 
really associated with the running state of a compaction task.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -460,14 +462,27 @@ private static ClientCompactionTaskQuery compactSegments(
     context.put("priority", compactionTaskPriority);
 
     final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, 
ClientCompactionTaskQuery.TYPE, dataSource, null);
+    final ClientCompactionIntervalSpec clientCompactionIntervalSpec;
+    Preconditions.checkArgument(entry.getPolicyEligibility() != null, "Must 
have a policy eligibility");
+    switch (entry.getPolicyEligibility().getPolicyEligibility()) {
+      case FULL_COMPACTION:
+        clientCompactionIntervalSpec = new 
ClientCompactionIntervalSpec(entry.getCompactionInterval(), null, null);
+        break;
+      case INCREMENTAL_COMPACTION:

Review Comment:
   Let's update this part only when incremental compaction has been implemented 
on the task side. Also, we should support incremental compaction with 
`CompactionJobQueue` (i.e. Overlord-based compaction) only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to