kfaraz commented on code in PR #18968:
URL: https://github.com/apache/druid/pull/18968#discussion_r2762688447
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -160,21 +169,122 @@ public CompactionStatus getCurrentStatus()
return currentStatus;
}
+ @Nullable
+ public CompactionCandidateSearchPolicy.Eligibility getPolicyEligibility()
+ {
+ return policyEligiblity;
+ }
+
/**
* Creates a copy of this CompactionCandidate object with the given status.
*/
public CompactionCandidate withCurrentStatus(CompactionStatus status)
{
- return new CompactionCandidate(segments, umbrellaInterval,
compactionInterval, numIntervals, status);
+ return new CompactionCandidate(
+ segments,
+ umbrellaInterval,
+ compactionInterval,
+ numIntervals,
+ policyEligiblity,
+ status
+ );
+ }
+
+ public CompactionCandidate
withPolicyEligibility(CompactionCandidateSearchPolicy.Eligibility eligibility)
+ {
+ return new CompactionCandidate(
+ segments,
+ umbrellaInterval,
+ compactionInterval,
+ numIntervals,
+ eligibility,
+ currentStatus
+ );
+ }
+
+ /**
+ * Evaluates this candidate for compaction eligibility based on the provided
+ * compaction configuration and search policy.
+ * <p>
+ * This method first evaluates the candidate against the compaction
configuration
+ * using a {@link CompactionStatus.Evaluator} to determine if any segments
need
+ * compaction. If segments are pending compaction, the search policy is
consulted
+ * to determine the type of compaction:
+ * <ul>
+ * <li><b>NOT_ELIGIBLE</b>: Returns a candidate with status SKIPPED,
indicating
+ * the policy decided compaction should not occur at this time</li>
+ * <li><b>FULL_COMPACTION</b>: Returns this candidate with status PENDING,
+ * indicating all segments should be compacted</li>
+ * <li><b>INCREMENTAL_COMPACTION</b>: Returns a new candidate containing only
+ * the uncompacted segments (as determined by the evaluator), with status
+ * PENDING for incremental compaction</li>
+ * </ul>
+ *
+ * @param config the compaction configuration for the datasource
+ * @param searchPolicy the policy used to determine compaction eligibility
+ * @return a CompactionCandidate with updated status and potentially
filtered segments
+ */
+ public CompactionCandidate evaluate(
Review Comment:
This method should not live here. The computation of the eligiblity should
be done in `CompactionStatusTracker`.
The overall workflow would be like this:
- identify a `CompactionCandidate`
- determine its `CompactionStatus`
- determine the eligiblity via `CompactionStatusTracker.computeEligibility()`
##########
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java:
##########
@@ -38,18 +42,37 @@ public class ClientCompactionIntervalSpec
private final Interval interval;
@Nullable
+ private final List<SegmentDescriptor> uncompactedSegments;
+ @Nullable
private final String sha256OfSortedSegmentIds;
@JsonCreator
public ClientCompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
+ @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor>
uncompactedSegments,
Review Comment:
It is a little confusing to add segments here since there is already a
`SpecificSegmentsSpec` which specifies the set of segments that should be
compacted.
I feel we should add a new type of `CompactionInputSpec` which will be used
for incremental compaction only. It would have both a non-null interval as well
as the set of segment IDs to compact incrementally.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction(
*/
class Eligibility
{
- public static final Eligibility OK = new Eligibility(true, null);
+ public enum PolicyEligibility
Review Comment:
Maybe we should call this enum `Status` (since this represents the
eligibility status) to avoid confusion with `Eligibility` class itself.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction(
*/
class Eligibility
Review Comment:
Since this class is going to have a wider usage now, let's move it out into
a separate file of its own.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -357,29 +348,52 @@ private static class Evaluator
private final Map<CompactionState, List<DataSegment>>
unknownStateToSegments = new HashMap<>();
@Nullable
- private final String targetFingerprint;
private final IndexingStateFingerprintMapper fingerprintMapper;
+ @Nullable
+ private final String targetFingerprint;
- private Evaluator(
+ Evaluator(
CompactionCandidate candidateSegments,
DataSourceCompactionConfig compactionConfig,
- @Nullable String targetFingerprint,
@Nullable IndexingStateFingerprintMapper fingerprintMapper
)
{
this.candidateSegments = candidateSegments;
this.compactionConfig = compactionConfig;
this.tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
- this.targetFingerprint = targetFingerprint;
this.fingerprintMapper = fingerprintMapper;
+ if (fingerprintMapper == null) {
+ targetFingerprint = null;
+ } else {
+ targetFingerprint = fingerprintMapper.generateFingerprint(
+ compactionConfig.getDataSource(),
+ compactionConfig.toCompactionState()
+ );
+ }
+ }
+
+ List<DataSegment> getUncompactedSegments()
+ {
+ return uncompactedSegments;
}
- private CompactionStatus evaluate()
+ /**
+ * Evaluates the compaction status of candidate segments through a
multi-step process:
+ * <ol>
+ * <li>Validates input bytes are within limits</li>
+ * <li>Categorizes segments by compaction state (fingerprinted,
uncompacted, or unknown)</li>
+ * <li>Performs fingerprint-based validation if available (fast
path)</li>
+ * <li>Runs detailed checks against unknown states via {@link
#CHECKS}</li>
+ * </ol>
+ *
+ * @return Pair of eligibility status and compaction status with reason
for first failed check
+ */
+ Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus>
evaluate()
{
- final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit();
- if (inputBytesCheck.isSkipped()) {
- return inputBytesCheck;
+ final CompactionCandidateSearchPolicy.Eligibility inputBytesCheck =
inputBytesAreWithinLimit();
+ if (inputBytesCheck != null) {
Review Comment:
I think the input bytes check should now be moved out of this class and used
in `CompactionStatusTracker.computeEligibility()` instead.
##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -329,17 +331,17 @@ private void
findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti
continue;
}
- final CompactionCandidate candidates =
CompactionCandidate.from(segments, config.getSegmentGranularity());
- final CompactionStatus compactionStatus =
CompactionStatus.compute(candidates, config, fingerprintMapper);
- final CompactionCandidate candidatesWithStatus =
candidates.withCurrentStatus(compactionStatus);
+ final CompactionCandidate candidatesWithStatus =
Review Comment:
I feel this class should remain untouched since we are only computing the
`CompactionStatus` here. The eligiblity computation should happen later.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java:
##########
@@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction(
*/
class Eligibility
{
- public static final Eligibility OK = new Eligibility(true, null);
+ public enum PolicyEligibility
+ {
+ FULL_COMPACTION,
+ INCREMENTAL_COMPACTION,
+ NOT_ELIGIBLE,
+ NOT_APPLICABLE
Review Comment:
How is "not applicable" different from "not eligible"?
I think eligibility should have only 3 possible states - not eligible,
incremental and full.
##########
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java:
##########
@@ -185,8 +215,18 @@ public Eligibility checkEligibilityForCompaction(
"Average size[%,d] of uncompacted segments in interval must be at
most [%,d]",
avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes()
);
+ }
+
+ final double uncompactedBytesRatio = (double) uncompacted.getTotalBytes() /
+ (uncompacted.getTotalBytes() +
candidate.getCompactedStats().getTotalBytes());
+ if (uncompactedBytesRatio <
incrementalCompactionUncompactedBytesRatioThreshold) {
+ return Eligibility.incrementalCompaction(
Review Comment:
Let's not add this config or return incremental compaction here since
incremental compaction is not implemented yet. If a user configures this field
by mistake, compaction might misbehave.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -357,29 +348,52 @@ private static class Evaluator
private final Map<CompactionState, List<DataSegment>>
unknownStateToSegments = new HashMap<>();
@Nullable
- private final String targetFingerprint;
private final IndexingStateFingerprintMapper fingerprintMapper;
+ @Nullable
+ private final String targetFingerprint;
- private Evaluator(
+ Evaluator(
CompactionCandidate candidateSegments,
DataSourceCompactionConfig compactionConfig,
- @Nullable String targetFingerprint,
@Nullable IndexingStateFingerprintMapper fingerprintMapper
)
{
this.candidateSegments = candidateSegments;
this.compactionConfig = compactionConfig;
this.tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
- this.targetFingerprint = targetFingerprint;
this.fingerprintMapper = fingerprintMapper;
+ if (fingerprintMapper == null) {
+ targetFingerprint = null;
+ } else {
+ targetFingerprint = fingerprintMapper.generateFingerprint(
+ compactionConfig.getDataSource(),
+ compactionConfig.toCompactionState()
+ );
+ }
+ }
+
+ List<DataSegment> getUncompactedSegments()
+ {
+ return uncompactedSegments;
}
- private CompactionStatus evaluate()
+ /**
+ * Evaluates the compaction status of candidate segments through a
multi-step process:
+ * <ol>
+ * <li>Validates input bytes are within limits</li>
+ * <li>Categorizes segments by compaction state (fingerprinted,
uncompacted, or unknown)</li>
+ * <li>Performs fingerprint-based validation if available (fast
path)</li>
+ * <li>Runs detailed checks against unknown states via {@link
#CHECKS}</li>
+ * </ol>
+ *
+ * @return Pair of eligibility status and compaction status with reason
for first failed check
+ */
+ Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus>
evaluate()
Review Comment:
Since the `CompactionStatus` and eligibility are going to be two separate
things now, this class should not deal with eligibility at all. We can retain
the old code in this class.
Once the `CompactionStatus` has been computed, that should be passed to
`CompactionStatusTracker.computeEligiblity()` to determine the eligibility.
##########
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java:
##########
@@ -47,13 +47,16 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
private final int minUncompactedCount;
private final HumanReadableBytes minUncompactedBytes;
private final HumanReadableBytes maxAverageUncompactedBytesPerSegment;
+ private final double incrementalCompactionUncompactedBytesRatioThreshold;
@JsonCreator
public MostFragmentedIntervalFirstPolicy(
@JsonProperty("minUncompactedCount") @Nullable Integer
minUncompactedCount,
@JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes
minUncompactedBytes,
@JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable
HumanReadableBytes maxAverageUncompactedBytesPerSegment,
+ @JsonProperty("incrementalCompactionUncompactedBytesRatioThreshold")
@Nullable
Review Comment:
Let's not add this user-facing config until incremental compaction has been
implemented.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -344,7 +335,7 @@ static DimensionRangePartitionsSpec
getEffectiveRangePartitionsSpec(DimensionRan
* Evaluates {@link #CHECKS} to determine the compaction status of a
* {@link CompactionCandidate}.
*/
- private static class Evaluator
+ static class Evaluator
Review Comment:
Let's keep the evaluator private as before.
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -80,10 +80,7 @@ public Set<String> getSubmittedTaskIds()
* This method assumes that the given candidate is eligible for compaction
* based on the current compaction config/supervisor of the datasource.
*/
- public CompactionStatus computeCompactionStatus(
- CompactionCandidate candidate,
- CompactionCandidateSearchPolicy searchPolicy
- )
+ public CompactionStatus computeCompactionStatus(CompactionCandidate
candidate)
Review Comment:
Since we are now decoupling eligibility (i.e. whether or not to run
compaction) from the current compaction status (i.e. current degree of
compaction in the interval), it would make sense to do the following:
- Repurpose this method to compute the eligibility since the eligibility can
be affected by the `lastTaskStatus` but the `CompactionStatus` should not
depend on it.
- `CompactionStatus` need not have the `RUNNING` state anymore since it is
really associated with the running state of a compaction task.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -460,14 +462,27 @@ private static ClientCompactionTaskQuery compactSegments(
context.put("priority", compactionTaskPriority);
final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX,
ClientCompactionTaskQuery.TYPE, dataSource, null);
+ final ClientCompactionIntervalSpec clientCompactionIntervalSpec;
+ Preconditions.checkArgument(entry.getPolicyEligibility() != null, "Must
have a policy eligibility");
+ switch (entry.getPolicyEligibility().getPolicyEligibility()) {
+ case FULL_COMPACTION:
+ clientCompactionIntervalSpec = new
ClientCompactionIntervalSpec(entry.getCompactionInterval(), null, null);
+ break;
+ case INCREMENTAL_COMPACTION:
Review Comment:
Let's update this part only when incremental compaction has been implemented
on the task side. Also, we should support incremental compaction with
`CompactionJobQueue` (i.e. Overlord-based compaction) only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]