kfaraz commented on code in PR #18968:
URL: https://github.com/apache/druid/pull/18968#discussion_r2795122751


##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -37,76 +39,200 @@
  */
 public class CompactionCandidate
 {
-  private final List<DataSegment> segments;
-  private final Interval umbrellaInterval;
-  private final Interval compactionInterval;
-  private final String dataSource;
-  private final long totalBytes;
-  private final int numIntervals;
-
-  private final CompactionStatus currentStatus;
-
-  public static CompactionCandidate from(
-      List<DataSegment> segments,
-      @Nullable Granularity targetSegmentGranularity
-  )
+  /**
+   * Non-empty list of segments of a datasource being proposed for compaction.
+   * A proposed compaction typically contains all the segments of a single 
time chunk.
+   */
+  public static class ProposedCompaction

Review Comment:
   What we need is a way to distinguish between a candidate before and after 
the status has been computed, so that we can avoid the null checks on 
`currentStatus`. I don't think this change fully achieves that.
   
   Also, the name `CompactionCandidate` captures the intent better as it 
defines a potential "candidate" for compaction. So let's stick with it.
   
   Instead of this change, you may consider adding a new class 
`CompactionCandidateAndStatus` which will be used by both the methods of the 
search policy. This class will contain only a `candidate` field and a `status` 
field.
   
   I don't think we will need this class to contain the `Eligibility` or the 
`CompactionMode` since it will be computed and used only by the 
`CompactionJobQueue`.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+
+public enum CompactionMode
+{
+  FULL_COMPACTION {
+    @Override
+    public CompactionCandidate createCandidate(
+        CompactionCandidate.ProposedCompaction proposedCompaction,
+        CompactionStatus eligibility,
+        @Nullable String policyNote
+    )
+    {
+      return new CompactionCandidate(proposedCompaction, eligibility, 
policyNote, this);
+    }
+  },
+  NOT_APPLICABLE;
+
+  public CompactionCandidate createCandidate(
+      CompactionCandidate.ProposedCompaction proposedCompaction,
+      CompactionStatus eligibility
+  )
+  {
+    return createCandidate(proposedCompaction, eligibility, null);
+  }
+
+  public CompactionCandidate createCandidate(

Review Comment:
   We shouldn't really have this method here. The `CompactionMode` is an 
attribute that we associate to a `CompactionCandidate`. One should not be 
responsible for creating the other.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -48,74 +45,11 @@ public interface CompactionCandidateSearchPolicy
   int compareCandidates(CompactionCandidate candidateA, CompactionCandidate 
candidateB);
 
   /**
-   * Checks if the given {@link CompactionCandidate} is eligible for compaction
-   * in the current iteration. A policy may implement this method to skip
-   * compacting intervals or segments that do not fulfil some required 
criteria.
+   * Creates a {@link CompactionCandidate} after applying policy-specific 
checks to the proposed compaction candidate.
    *
-   * @return {@link Eligibility#OK} only if eligible.
-   */
-  Eligibility checkEligibilityForCompaction(
-      CompactionCandidate candidate,
-      CompactionTaskStatus latestTaskStatus
-  );
-
-  /**
-   * Describes the eligibility of an interval for compaction.
+   * @param candidate the proposed compaction
+   * @param eligibility initial eligibility from compaction config checks
+   * @return final compaction candidate
    */
-  class Eligibility
-  {
-    public static final Eligibility OK = new Eligibility(true, null);
-
-    private final boolean eligible;
-    private final String reason;
-
-    private Eligibility(boolean eligible, String reason)
-    {
-      this.eligible = eligible;
-      this.reason = reason;
-    }
-
-    public boolean isEligible()
-    {
-      return eligible;
-    }
-
-    public String getReason()
-    {
-      return reason;
-    }
-
-    public static Eligibility fail(String messageFormat, Object... args)
-    {
-      return new Eligibility(false, StringUtils.format(messageFormat, args));
-    }
-
-    @Override
-    public boolean equals(Object object)
-    {
-      if (this == object) {
-        return true;
-      }
-      if (object == null || getClass() != object.getClass()) {
-        return false;
-      }
-      Eligibility that = (Eligibility) object;
-      return eligible == that.eligible && Objects.equals(reason, that.reason);
-    }
-
-    @Override
-    public int hashCode()
-    {
-      return Objects.hash(eligible, reason);
-    }
-
-    @Override
-    public String toString()
-    {
-      return "Eligibility{" +
-             "eligible=" + eligible +
-             ", reason='" + reason + '\'' +
-             '}';
-    }
-  }
+  CompactionCandidate createCandidate(CompactionCandidate.ProposedCompaction 
candidate, CompactionStatus eligibility);

Review Comment:
   It doesn't make sense for the policy to be creating candidates since the 
candidates are identified much earlier by the 
`DatasourceCompactibleSegmentIterator` itself. The policy can only check the 
eligibility of a candidate for compaction.  (I suppose the name 
`CompactionCandidateSearchPolicy` might be misleading here, since it is really 
the `CompactionCandidatePickAndPriorizePolicy` but it can be a mouthful, so we 
can stick with what we have.)
   
   We should retain the original method here. The caller (`CompactionJobQueue`) 
should just check the eligibility and then decide whether to launch a job or 
not. If yes, then whether full or incremental.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -48,74 +45,11 @@ public interface CompactionCandidateSearchPolicy
   int compareCandidates(CompactionCandidate candidateA, CompactionCandidate 
candidateB);
 
   /**
-   * Checks if the given {@link CompactionCandidate} is eligible for compaction
-   * in the current iteration. A policy may implement this method to skip
-   * compacting intervals or segments that do not fulfil some required 
criteria.
+   * Creates a {@link CompactionCandidate} after applying policy-specific 
checks to the proposed compaction candidate.
    *
-   * @return {@link Eligibility#OK} only if eligible.
-   */
-  Eligibility checkEligibilityForCompaction(
-      CompactionCandidate candidate,
-      CompactionTaskStatus latestTaskStatus
-  );
-
-  /**
-   * Describes the eligibility of an interval for compaction.
+   * @param candidate the proposed compaction
+   * @param eligibility initial eligibility from compaction config checks
+   * @return final compaction candidate
    */
-  class Eligibility

Review Comment:
   Let's retain the `Eligibility` class since it answers the question:
   
   _"Does the search policy consider this `CompactionCandidate` eligible for 
compaction?"_
   
   The `Eligibility` class should now also contain an enum field for 
`CompactionMode`. The rest of the class can remain unchanged.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -37,76 +39,200 @@
  */
 public class CompactionCandidate
 {
-  private final List<DataSegment> segments;
-  private final Interval umbrellaInterval;
-  private final Interval compactionInterval;
-  private final String dataSource;
-  private final long totalBytes;
-  private final int numIntervals;
-
-  private final CompactionStatus currentStatus;
-
-  public static CompactionCandidate from(
-      List<DataSegment> segments,
-      @Nullable Granularity targetSegmentGranularity
-  )
+  /**
+   * Non-empty list of segments of a datasource being proposed for compaction.
+   * A proposed compaction typically contains all the segments of a single 
time chunk.
+   */
+  public static class ProposedCompaction
   {
-    if (segments == null || segments.isEmpty()) {
-      throw InvalidInput.exception("Segments to compact must be non-empty");
+    private final List<DataSegment> segments;
+    private final Interval umbrellaInterval;
+    private final Interval compactionInterval;
+    private final String dataSource;
+    private final long totalBytes;
+    private final int numIntervals;
+
+    public static ProposedCompaction from(
+        List<DataSegment> segments,
+        @Nullable Granularity targetSegmentGranularity
+    )
+    {
+      if (segments == null || segments.isEmpty()) {
+        throw InvalidInput.exception("Segments to compact must be non-empty");
+      }
+
+      final Set<Interval> segmentIntervals =
+          
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
+      final Interval umbrellaInterval = 
JodaUtils.umbrellaInterval(segmentIntervals);
+      final Interval compactionInterval =
+          targetSegmentGranularity == null
+          ? umbrellaInterval
+          : 
JodaUtils.umbrellaInterval(targetSegmentGranularity.getIterable(umbrellaInterval));
+
+      return new ProposedCompaction(
+          segments,
+          umbrellaInterval,
+          compactionInterval,
+          segmentIntervals.size()
+      );
     }
 
-    final Set<Interval> segmentIntervals =
-        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
-    final Interval umbrellaInterval = 
JodaUtils.umbrellaInterval(segmentIntervals);
-    final Interval compactionInterval =
-        targetSegmentGranularity == null
-        ? umbrellaInterval
-        : 
JodaUtils.umbrellaInterval(targetSegmentGranularity.getIterable(umbrellaInterval));
-
-    return new CompactionCandidate(
-        segments,
-        umbrellaInterval,
-        compactionInterval,
-        segmentIntervals.size(),
-        null
-    );
+    ProposedCompaction(
+        List<DataSegment> segments,
+        Interval umbrellaInterval,
+        Interval compactionInterval,
+        int numDistinctSegmentIntervals
+    )
+    {
+      this.segments = segments;
+      this.totalBytes = 
segments.stream().mapToLong(DataSegment::getSize).sum();
+
+      this.umbrellaInterval = umbrellaInterval;
+      this.compactionInterval = compactionInterval;
+
+      this.numIntervals = numDistinctSegmentIntervals;
+      this.dataSource = segments.get(0).getDataSource();
+    }
+
+    /**
+     * @return Non-empty list of segments that make up this proposed 
compaction.
+     */
+    public List<DataSegment> getSegments()
+    {
+      return segments;
+    }
+
+    public long getTotalBytes()
+    {
+      return totalBytes;
+    }
+
+    public int numSegments()
+    {
+      return segments.size();
+    }
+
+    /**
+     * Umbrella interval of all the segments in this proposed compaction. This 
typically
+     * corresponds to a single time chunk in the segment timeline.
+     */
+    public Interval getUmbrellaInterval()
+    {
+      return umbrellaInterval;
+    }
+
+    /**
+     * Interval aligned to the target segment granularity used for the 
compaction
+     * task. This interval completely contains the {@link #umbrellaInterval}.
+     */
+    public Interval getCompactionInterval()
+    {
+      return compactionInterval;
+    }
+
+    public String getDataSource()
+    {
+      return dataSource;
+    }
+
+    public CompactionStatistics getStats()
+    {
+      return CompactionStatistics.create(totalBytes, numSegments(), 
numIntervals);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ProposedCompaction that = (ProposedCompaction) o;
+      return totalBytes == that.totalBytes
+             && numIntervals == that.numIntervals
+             && segments.equals(that.segments)
+             && umbrellaInterval.equals(that.umbrellaInterval)
+             && compactionInterval.equals(that.compactionInterval)
+             && dataSource.equals(that.dataSource);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(segments, umbrellaInterval, compactionInterval, 
dataSource, totalBytes, numIntervals);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ProposedCompaction{" +
+             "datasource=" + dataSource +
+             ", umbrellaInterval=" + umbrellaInterval +
+             ", compactionInterval=" + compactionInterval +
+             ", numIntervals=" + numIntervals +
+             ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+             ", totalSize=" + totalBytes +
+             '}';
+    }
   }
 
-  private CompactionCandidate(
-      List<DataSegment> segments,
-      Interval umbrellaInterval,
-      Interval compactionInterval,
-      int numDistinctSegmentIntervals,
-      @Nullable CompactionStatus currentStatus
-  )
+  /**
+   * Used by {@link 
CompactionStatusTracker#computeCompactionTaskState(CompactionCandidate)}.
+   * The callsite then determines whether to launch compaction task or not.
+   */
+  public enum TaskState

Review Comment:
   Please do not add a new `TaskState`, use `CompactionTaskStatus` instead. If 
it doesn't contain any info, let's add it there.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java:
##########
@@ -282,18 +281,17 @@ private boolean startJobIfPendingAndReady(
     }
 
     // Check if the job is already running, completed or skipped
-    final CompactionStatus compactionStatus = getCurrentStatusForJob(job, 
policy);
-    switch (compactionStatus.getState()) {
-      case RUNNING:
+    final CompactionCandidate.TaskState candidateState = 
getCurrentTaskStateForJob(job);

Review Comment:
   Please do not add a new task state. Use the existing `CompactionTaskStatus` 
class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to