capistrant commented on code in PR #18968:
URL: https://github.com/apache/druid/pull/18968#discussion_r2774705747


##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -329,17 +331,17 @@ private void 
findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti
         continue;
       }
 
-      final CompactionCandidate candidates = 
CompactionCandidate.from(segments, config.getSegmentGranularity());
-      final CompactionStatus compactionStatus = 
CompactionStatus.compute(candidates, config, fingerprintMapper);
-      final CompactionCandidate candidatesWithStatus = 
candidates.withCurrentStatus(compactionStatus);
+      final CompactionCandidate candidatesWithStatus =

Review Comment:
   For reference, Cece. Changes I have been working on have also been only 
adding functionality to the supervisor. The IndexingState fingerprinting that 
compaction uses is only available for supervisors and rule based 
compaction/reindexing will also be supervisor only. Maybe it is time we 
officially do a separate PR and mark this duty as deprecated and update docs as 
such since so much is happening in this space these days



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java:
##########
@@ -0,0 +1,932 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
+import org.apache.druid.segment.transform.CompactionTransformSpec;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Describes the eligibility of an interval for compaction.
+ */
+public class CompactionEligibility
+{
+  public enum State
+  {
+    FULL_COMPACTION,
+    INCREMENTAL_COMPACTION,
+    NOT_ELIGIBLE,
+    NOT_APPLICABLE
+  }
+
+  static class CompactionEligibilityBuilder
+  {
+    private State state;
+    private CompactionStatistics compacted;
+    private CompactionStatistics uncompacted;
+    private List<DataSegment> uncompactedSegments;
+    private String reason;
+
+    CompactionEligibilityBuilder(State state, String reason)
+    {
+      this.state = state;
+      this.reason = reason;
+    }
+
+    CompactionEligibilityBuilder compacted(CompactionStatistics compacted)
+    {
+      this.compacted = compacted;
+      return this;
+    }
+
+    CompactionEligibilityBuilder uncompacted(CompactionStatistics uncompacted)
+    {
+      this.uncompacted = uncompacted;
+      return this;
+    }
+
+    CompactionEligibilityBuilder uncompactedSegments(List<DataSegment> 
uncompactedSegments)
+    {
+      this.uncompactedSegments = uncompactedSegments;
+      return this;
+    }
+
+    CompactionEligibility build()
+    {
+      return new CompactionEligibility(state, reason, compacted, uncompacted, 
uncompactedSegments);
+    }
+  }
+
+  public static final CompactionEligibility NOT_APPLICABLE = 
builder(State.NOT_APPLICABLE, "").build();
+
+  public static CompactionEligibility fail(String messageFormat, Object... 
args)
+  {
+    return builder(State.NOT_ELIGIBLE, StringUtils.format(messageFormat, 
args)).build();
+  }
+
+  private final State state;
+  private final String reason;
+
+  @Nullable
+  private final CompactionStatistics compacted;
+  @Nullable
+  private final CompactionStatistics uncompacted;
+  @Nullable
+  private final List<DataSegment> uncompactedSegments;
+
+  private CompactionEligibility(
+      State state,
+      String reason,
+      @Nullable CompactionStatistics compacted,
+      @Nullable CompactionStatistics uncompacted,
+      @Nullable List<DataSegment> uncompactedSegments
+  )
+  {
+    this.state = state;
+    this.reason = reason;
+    switch (state) {
+      case NOT_APPLICABLE:
+        break;
+      case NOT_ELIGIBLE:
+        InvalidInput.conditionalException(!Strings.isNullOrEmpty(reason), 
"must provide a reason");
+        break;
+      case FULL_COMPACTION:
+      case INCREMENTAL_COMPACTION:
+        InvalidInput.conditionalException(compacted != null, "must provide 
compacted stats");
+        InvalidInput.conditionalException(uncompacted != null, "must provide 
uncompacted stats");
+        InvalidInput.conditionalException(uncompactedSegments != null, "must 
provide uncompactedSegments");
+        break;
+      default:
+        throw DruidException.defensive("unexpected eligibility state[%s]", 
state);
+    }
+    this.compacted = compacted;
+    this.uncompacted = uncompacted;
+    this.uncompactedSegments = uncompactedSegments;
+  }
+
+  static CompactionEligibilityBuilder builder(State state, String reason)
+  {
+    return new CompactionEligibilityBuilder(state, reason);
+  }
+
+  public State getState()
+  {
+    return state;
+  }
+
+  public String getReason()
+  {
+    return reason;
+  }
+
+  @Nullable
+  public CompactionStatistics getUncompactedStats()
+  {
+    return uncompacted;
+  }
+
+  @Nullable
+  public CompactionStatistics getCompactedStats()
+  {
+    return compacted;
+  }
+
+  @Nullable
+  public List<DataSegment> getUncompactedSegments()
+  {
+    return uncompactedSegments;
+  }
+
+  public CompactionCandidate 
createCandidate(CompactionCandidate.ProposedCompaction proposedCompaction)
+  {
+    switch (state) {
+      case NOT_APPLICABLE:
+        return new CompactionCandidate(proposedCompaction, this, 
CompactionStatus.COMPLETE);
+      case NOT_ELIGIBLE:
+        return new CompactionCandidate(proposedCompaction, this, 
CompactionStatus.skipped(reason));
+      case FULL_COMPACTION:
+        return new CompactionCandidate(
+            proposedCompaction,
+            this,
+            CompactionStatus.pending(reason)
+        );
+      case INCREMENTAL_COMPACTION:
+        CompactionCandidate.ProposedCompaction newProposed = new 
CompactionCandidate.ProposedCompaction(
+            uncompactedSegments,
+            proposedCompaction.getUmbrellaInterval(),
+            proposedCompaction.getCompactionInterval(),
+            
Math.toIntExact(uncompactedSegments.stream().map(DataSegment::getInterval).distinct().count())
+        );
+        return new CompactionCandidate(newProposed, this, 
CompactionStatus.pending(reason));
+      default:
+        throw DruidException.defensive("Unexpected eligibility state[%s]", 
state);
+    }
+  }
+
+  /**
+   * Evaluates a compaction candidate to determine its eligibility and 
compaction status.
+   * <p>
+   * This method performs a two-stage evaluation:
+   * <ol>
+   * <li>First, uses {@link Evaluator} to check if the candidate needs 
compaction

Review Comment:
   any changes to the Evaluator logic in moving it to this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to