kfaraz commented on code in PR #18968:
URL: https://github.com/apache/druid/pull/18968#discussion_r2767199319
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -80,10 +80,7 @@ public Set<String> getSubmittedTaskIds()
* This method assumes that the given candidate is eligible for compaction
* based on the current compaction config/supervisor of the datasource.
*/
- public CompactionStatus computeCompactionStatus(
- CompactionCandidate candidate,
- CompactionCandidateSearchPolicy searchPolicy
- )
+ public CompactionStatus computeCompactionStatus(CompactionCandidate
candidate)
Review Comment:
I suppose we can now remove this method altogether, since we do not want to
use the `lastTaskStatus` anyway.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -357,29 +348,52 @@ private static class Evaluator
private final Map<CompactionState, List<DataSegment>>
unknownStateToSegments = new HashMap<>();
@Nullable
- private final String targetFingerprint;
private final IndexingStateFingerprintMapper fingerprintMapper;
+ @Nullable
+ private final String targetFingerprint;
- private Evaluator(
+ Evaluator(
CompactionCandidate candidateSegments,
DataSourceCompactionConfig compactionConfig,
- @Nullable String targetFingerprint,
@Nullable IndexingStateFingerprintMapper fingerprintMapper
)
{
this.candidateSegments = candidateSegments;
this.compactionConfig = compactionConfig;
this.tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
- this.targetFingerprint = targetFingerprint;
this.fingerprintMapper = fingerprintMapper;
+ if (fingerprintMapper == null) {
+ targetFingerprint = null;
+ } else {
+ targetFingerprint = fingerprintMapper.generateFingerprint(
+ compactionConfig.getDataSource(),
+ compactionConfig.toCompactionState()
+ );
+ }
+ }
+
+ List<DataSegment> getUncompactedSegments()
+ {
+ return uncompactedSegments;
}
- private CompactionStatus evaluate()
+ /**
+ * Evaluates the compaction status of candidate segments through a
multi-step process:
+ * <ol>
+ * <li>Validates input bytes are within limits</li>
+ * <li>Categorizes segments by compaction state (fingerprinted,
uncompacted, or unknown)</li>
+ * <li>Performs fingerprint-based validation if available (fast
path)</li>
+ * <li>Runs detailed checks against unknown states via {@link
#CHECKS}</li>
+ * </ol>
+ *
+ * @return Pair of eligibility status and compaction status with reason
for first failed check
+ */
+ Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus>
evaluate()
{
- final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit();
- if (inputBytesCheck.isSkipped()) {
- return inputBytesCheck;
+ final CompactionCandidateSearchPolicy.Eligibility inputBytesCheck =
inputBytesAreWithinLimit();
+ if (inputBytesCheck != null) {
Review Comment:
@cecemei , I think this class should remain untouched so that
`CompactionStatus` can remain completely agnostic of `CompactionEligibility`.
##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -329,17 +331,17 @@ private void
findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti
continue;
}
- final CompactionCandidate candidates =
CompactionCandidate.from(segments, config.getSegmentGranularity());
- final CompactionStatus compactionStatus =
CompactionStatus.compute(candidates, config, fingerprintMapper);
- final CompactionCandidate candidatesWithStatus =
candidates.withCurrentStatus(compactionStatus);
+ final CompactionCandidate candidatesWithStatus =
Review Comment:
Coordinator-based compaction will be deprecated soon.
We want to support incremental compaction on Overlord-based compaction only
so that it becomes an incentive for users to migrate to compaction supervisors.
We followed the same approach with indexing fingerprints as well.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.Objects;
+
+/**
+ * Describes the eligibility of an interval for compaction.
+ */
+public class CompactionEligibility
Review Comment:
We should add a method to this class:
```
public static CompactionEligibility compute(CompactionStatus status,
CompactionCandidateSearchPolicy policy) {
// determine the eligibility here
}
```
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -160,21 +169,122 @@ public CompactionStatus getCurrentStatus()
return currentStatus;
}
+ @Nullable
+ public CompactionCandidateSearchPolicy.Eligibility getPolicyEligibility()
+ {
+ return policyEligiblity;
+ }
+
/**
* Creates a copy of this CompactionCandidate object with the given status.
*/
public CompactionCandidate withCurrentStatus(CompactionStatus status)
{
- return new CompactionCandidate(segments, umbrellaInterval,
compactionInterval, numIntervals, status);
+ return new CompactionCandidate(
+ segments,
+ umbrellaInterval,
+ compactionInterval,
+ numIntervals,
+ policyEligiblity,
+ status
+ );
+ }
+
+ public CompactionCandidate
withPolicyEligibility(CompactionCandidateSearchPolicy.Eligibility eligibility)
+ {
+ return new CompactionCandidate(
+ segments,
+ umbrellaInterval,
+ compactionInterval,
+ numIntervals,
+ eligibility,
+ currentStatus
+ );
+ }
+
+ /**
+ * Evaluates this candidate for compaction eligibility based on the provided
+ * compaction configuration and search policy.
+ * <p>
+ * This method first evaluates the candidate against the compaction
configuration
+ * using a {@link CompactionStatus.Evaluator} to determine if any segments
need
+ * compaction. If segments are pending compaction, the search policy is
consulted
+ * to determine the type of compaction:
+ * <ul>
+ * <li><b>NOT_ELIGIBLE</b>: Returns a candidate with status SKIPPED,
indicating
+ * the policy decided compaction should not occur at this time</li>
+ * <li><b>FULL_COMPACTION</b>: Returns this candidate with status PENDING,
+ * indicating all segments should be compacted</li>
+ * <li><b>INCREMENTAL_COMPACTION</b>: Returns a new candidate containing only
+ * the uncompacted segments (as determined by the evaluator), with status
+ * PENDING for incremental compaction</li>
+ * </ul>
+ *
+ * @param config the compaction configuration for the datasource
+ * @param searchPolicy the policy used to determine compaction eligibility
+ * @return a CompactionCandidate with updated status and potentially
filtered segments
+ */
+ public CompactionCandidate evaluate(
Review Comment:
+1
But this method should still not live here.
We should just have a `withEligibility()` method so that this class remains
a bean for the most part.
This method can be moved to `CompactionEligibility.compute()` similar to
`CompactionStatus.compute()`.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -80,10 +80,7 @@ public Set<String> getSubmittedTaskIds()
* This method assumes that the given candidate is eligible for compaction
* based on the current compaction config/supervisor of the datasource.
*/
- public CompactionStatus computeCompactionStatus(
- CompactionCandidate candidate,
- CompactionCandidateSearchPolicy searchPolicy
- )
+ public CompactionStatus computeCompactionStatus(CompactionCandidate
candidate)
Review Comment:
Not needed anymore.
##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -329,17 +331,17 @@ private void
findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti
continue;
}
- final CompactionCandidate candidates =
CompactionCandidate.from(segments, config.getSegmentGranularity());
- final CompactionStatus compactionStatus =
CompactionStatus.compute(candidates, config, fingerprintMapper);
- final CompactionCandidate candidatesWithStatus =
candidates.withCurrentStatus(compactionStatus);
+ final CompactionCandidate candidatesWithStatus =
+ CompactionCandidate.from(segments, config.getSegmentGranularity())
+ .evaluate(config, searchPolicy,
fingerprintMapper);
Review Comment:
This part should probably look more like the following:
```suggestion
final CompactionCandidate candidates =
CompactionCandidate.from(segments, config.getSegmentGranularity());
final CompactionStatus compactionStatus =
CompactionStatus.compute(candidates, config, fingerprintMapper);
final CompactionEligibility eligibility =
CompactionEligibility.compute(compactionStatus, searchPolicy);
final CompactionCandidate candidatesWithStatus =
candidates.withCurrentStatus(compactionStatus).withEligibility(eligibility);
```
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -160,21 +169,123 @@ public CompactionStatus getCurrentStatus()
return currentStatus;
}
+ @Nullable
+ public CompactionEligibility getPolicyEligibility()
+ {
+ return policyEligiblity;
+ }
+
/**
* Creates a copy of this CompactionCandidate object with the given status.
*/
public CompactionCandidate withCurrentStatus(CompactionStatus status)
{
- return new CompactionCandidate(segments, umbrellaInterval,
compactionInterval, numIntervals, status);
+ return new CompactionCandidate(
+ segments,
+ umbrellaInterval,
+ compactionInterval,
+ numIntervals,
+ policyEligiblity,
+ status
+ );
+ }
+
+ public CompactionCandidate withPolicyEligibility(CompactionEligibility
eligibility)
Review Comment:
Nit: Thinking about this, I think it would be best to have a single method
(e.g. `withStatusAndEligibility()`) that creates a copy of the
`CompactionCandidate` with both the status and eligibility populated.
Otherwise, we would always create an intermediate copy unnecessarily.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]