This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5f77596d42b feat: adds row-based compaction eligibility filtering
(#19205)
5f77596d42b is described below
commit 5f77596d42bc38173dbff3a491d9a035094d9d6c
Author: Cece Mei <[email protected]>
AuthorDate: Wed Mar 25 11:53:28 2026 -0700
feat: adds row-based compaction eligibility filtering (#19205)
* rows
* document
* zero
* nullable
* spell
---
docs/api-reference/automatic-compaction-api.md | 25 +++++-
.../embedded/compact/CompactionSupervisorTest.java | 23 +++---
.../embedded/indexing/KafkaClusterMetricsTest.java | 2 +-
.../compact/OverlordCompactionSchedulerTest.java | 2 +-
.../server/compaction/CompactionCandidate.java | 35 ++++++---
.../server/compaction/CompactionRunSimulator.java | 2 +-
.../server/compaction/CompactionStatistics.java | 25 +++++-
.../druid/server/compaction/CompactionStatus.java | 30 ++++++--
.../MostFragmentedIntervalFirstPolicy.java | 54 ++++++++++++-
.../MostFragmentedIntervalFirstPolicyTest.java | 88 +++++++++++++++++++---
.../coordinator/AutoCompactionSnapshotTest.java | 6 +-
website/.spelling | 2 +
12 files changed, 243 insertions(+), 51 deletions(-)
diff --git a/docs/api-reference/automatic-compaction-api.md
b/docs/api-reference/automatic-compaction-api.md
index 6864aae4735..4f2ce3d62af 100644
--- a/docs/api-reference/automatic-compaction-api.md
+++ b/docs/api-reference/automatic-compaction-api.md
@@ -886,7 +886,7 @@ This includes the following fields:
|------|-----------|-------------|
|`compactionTaskSlotRatio`|Ratio of number of slots taken up by compaction
tasks to the number of total task slots across all workers.|0.1|
|`maxCompactionTaskSlots`|Maximum number of task slots that can be taken up by
compaction tasks and sub-tasks. Minimum number of task slots available for
compaction is 1. When using MSQ engine or Native engine with range
partitioning, a single compaction job occupies more than one task slot. In this
case, the minimum is 2 so that at least one compaction job can always run in
the cluster.|2147483647 (i.e. total task slots)|
-|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the
only supported policy is [Newest segment
first](#compaction-policy-newestsegmentfirst).|Newest segment first|
+|`compactionPolicy`|Policy to choose intervals for compaction. Supported
policies are [Newest segment first](#compaction-policy-newestsegmentfirst),
[Most fragmented first](#compaction-policy-mostfragmentedfirst), and [Fixed
interval order](#compaction-policy-fixedintervalorder).|Newest segment first|
|`useSupervisors`|Whether compaction should be run on Overlord using
supervisors instead of Coordinator duties.|false|
|`engine`|Engine used for running compaction tasks, unless overridden in the
datasource-level compaction config. Possible values are `native` and `msq`.
`msq` engine can be used for compaction only if `useSupervisors` is
`true`.|`native`|
|`storeCompactionStatePerSegment`|**This configuration only takes effect if
`useSupervisors` is `true`.** Whether to persist the full compaction state in
segment metadata. When `true` (default), compaction state is stored in both the
segment metadata and the indexing states table. This is historically how Druid
has worked. When `false`, only a fingerprint reference is stored in the segment
metadata, reducing storage overhead in the segments table. The actual
compaction state is stored in [...]
@@ -898,6 +898,29 @@ This includes the following fields:
|`type`|This must always be `newestSegmentFirst`||
|`priorityDatasource`|Datasource to prioritize for compaction. The intervals
of this datasource are chosen for compaction before the intervals of any other
datasource. Within this datasource, the intervals are prioritized based on the
chosen compaction policy.|None|
+#### Compaction policy `mostFragmentedFirst`
+
+This experimental policy prioritizes compaction of intervals with the largest
number of small uncompacted segments. It favors cluster stability by reducing
segment count over performance of queries on newer intervals.
+
+|Field|Description|Default value|
+|-----|-----------|-------------|
+|`type`|This must always be `mostFragmentedFirst`||
+|`priorityDatasource`|Datasource to prioritize for compaction. The intervals
of this datasource are chosen for compaction before the intervals of any other
datasource. Within this datasource, the intervals are prioritized based on the
chosen compaction policy.|None|
+|`minUncompactedCount`|Minimum number of uncompacted segments that must be
present in an interval to make it eligible for compaction. Must be greater than
0.|100|
+|`minUncompactedBytes`|Minimum total bytes of uncompacted segments that must
be present in an interval to make it eligible for compaction. Human-readable
byte format (e.g., "10MiB").|10 MiB|
+|`maxAverageUncompactedBytesPerSegment`|Maximum average size of uncompacted
segments in an interval eligible for compaction. Human-readable byte format
(e.g., "2GiB").|2 GiB|
+|`minUncompactedBytesPercentForFullCompaction`|Threshold percentage (0-100) of
uncompacted bytes to total bytes below which minor compaction is eligible
instead of full compaction.|0|
+|`minUncompactedRowsPercentForFullCompaction`|Threshold percentage (0-100) of
uncompacted rows to total rows below which minor compaction is eligible instead
of full compaction.|0|
+
+#### Compaction policy `fixedIntervalOrder`
+
+This policy specifies the datasources and intervals eligible for compaction
and their order. It is primarily used for integration tests.
+
+|Field|Description|Default value|
+|-----|-----------|-------------|
+|`type`|This must always be `fixedIntervalOrder`||
+|`eligibleCandidates`|List of datasource-interval pairs eligible for
compaction. Each entry contains `datasource` (string) and `interval` (ISO-8601
interval) fields. Compaction processes candidates in the order specified.|None|
+
#### URL
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index ddcf41f23db..0192b4853ed 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -220,14 +220,11 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}
- @MethodSource("getPartitionsSpec")
+ @MethodSource("getPolicyAndPartition")
@ParameterizedTest(name = "partitionsSpec={0}")
- public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec)
throws Exception
+ public void test_minorCompactionWithMSQ(MostFragmentedIntervalFirstPolicy
policy, PartitionsSpec partitionsSpec)
{
- configureCompaction(
- CompactionEngine.MSQ,
- new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, 80, null)
- );
+ configureCompaction(CompactionEngine.MSQ, policy);
ingest1kRecords();
ingest1kRecords();
@@ -1026,11 +1023,19 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
cluster.callApi().runTask(task, overlord);
}
- public static List<PartitionsSpec> getPartitionsSpec()
+ public static List<Object[]> getPolicyAndPartition()
{
return List.of(
- new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false),
- new DynamicPartitionsSpec(null, null)
+ new Object[]{
+ // decides minor compaction based on bytes percent
+ new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, 80, null, null),
+ new DimensionRangePartitionsSpec(null, 10_000, List.of("page"),
false)
+ },
+ new Object[]{
+ // decides minor compaction based on rows percent
+ new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, null, 51, null),
+ new DynamicPartitionsSpec(null, null)
+ }
);
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index c2f07d12495..39742a9d578 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -147,7 +147,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
),
Arguments.of(
CompactionEngine.MSQ,
- new MostFragmentedIntervalFirstPolicy(1,
HumanReadableBytes.valueOf(1), null, 80, null)
+ new MostFragmentedIntervalFirstPolicy(1,
HumanReadableBytes.valueOf(1), null, null, 80, null)
)
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index 5c598024c15..5c1200110f8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -394,7 +394,7 @@ public class OverlordCompactionSchedulerTest
runCompactionTasks(1);
final AutoCompactionSnapshot.Builder expectedSnapshot =
AutoCompactionSnapshot.builder(dataSource);
-
expectedSnapshot.incrementWaitingStats(CompactionStatistics.create(100_000_000,
1, 1));
+
expectedSnapshot.incrementWaitingStats(CompactionStatistics.create(100_000_000,
null, 1, 1));
Assert.assertEquals(
expectedSnapshot.build(),
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
index d3653ccd992..653bc353f24 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
@@ -42,9 +42,8 @@ public class CompactionCandidate
private final Interval umbrellaInterval;
private final Interval compactionInterval;
private final String dataSource;
- private final long totalBytes;
- private final int numIntervals;
+ private final CompactionStatistics compactionStatistics;
private final CompactionStatus currentStatus;
public static Interval getCompactionInterval(
@@ -99,12 +98,23 @@ public class CompactionCandidate
)
{
this.segments = segments;
- this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum();
+
+ final Long totalRows;
+ if (segments.stream().allMatch(s -> s.getTotalRows() != null)) {
+ totalRows = segments.stream().mapToLong(DataSegment::getTotalRows).sum();
+ } else {
+ totalRows = null;
+ }
+ this.compactionStatistics = CompactionStatistics.create(
+ segments.stream().mapToLong(DataSegment::getSize).sum(),
+ totalRows,
+ segments.size(),
+ numDistinctSegmentIntervals
+ );
this.umbrellaInterval = umbrellaInterval;
this.compactionInterval = compactionInterval;
- this.numIntervals = numDistinctSegmentIntervals;
this.dataSource = segments.get(0).getDataSource();
this.currentStatus = currentStatus;
}
@@ -117,11 +127,6 @@ public class CompactionCandidate
return segments;
}
- public long getTotalBytes()
- {
- return totalBytes;
- }
-
public int numSegments()
{
return segments.size();
@@ -152,7 +157,7 @@ public class CompactionCandidate
public CompactionStatistics getStats()
{
- return CompactionStatistics.create(totalBytes, numSegments(),
numIntervals);
+ return compactionStatistics;
}
@Nullable
@@ -187,7 +192,13 @@ public class CompactionCandidate
*/
public CompactionCandidate withCurrentStatus(CompactionStatus status)
{
- return new CompactionCandidate(segments, umbrellaInterval,
compactionInterval, numIntervals, status);
+ return new CompactionCandidate(
+ segments,
+ umbrellaInterval,
+ compactionInterval,
+ Math.toIntExact(compactionStatistics.getNumIntervals()),
+ status
+ );
}
@Override
@@ -196,7 +207,7 @@ public class CompactionCandidate
return "SegmentsToCompact{" +
"datasource=" + dataSource +
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
- ", totalSize=" + totalBytes +
+ ", compactionStatistics=" + compactionStatistics +
", currentStatus=" + currentStatus +
'}';
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
index 586f16c3b23..b01d892c3c0 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
@@ -182,7 +182,7 @@ public class CompactionRunSimulator
row.add(candidate.getDataSource());
row.add(candidate.getCompactionInterval());
row.add(candidate.numSegments());
- row.add(candidate.getTotalBytes());
+ row.add(candidate.getStats().getTotalBytes());
row.add(CompactionSlotManager.getMaxTaskSlotsForNativeCompactionTask(tuningConfig));
if (reason != null) {
row.add(reason);
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
index 7d43a09aed8..d03eee00eb7 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
@@ -19,19 +19,25 @@
package org.apache.druid.server.compaction;
+import javax.annotation.Nullable;
+
/**
* Used to track statistics for segments in different states of compaction.
+ * totalRows can be null for old segments where row count was not stored.
*/
public class CompactionStatistics
{
private long totalBytes;
+ @Nullable
+ private Long totalRows;
private long numSegments;
private long numIntervals;
- public static CompactionStatistics create(long bytes, long numSegments, long
numIntervals)
+ public static CompactionStatistics create(long bytes, Long totalRows, long
numSegments, long numIntervals)
{
final CompactionStatistics stats = new CompactionStatistics();
stats.totalBytes = bytes;
+ stats.totalRows = totalRows;
stats.numIntervals = numIntervals;
stats.numSegments = numSegments;
return stats;
@@ -42,6 +48,12 @@ public class CompactionStatistics
return totalBytes;
}
+ @Nullable
+ public Long getTotalRows()
+ {
+ return totalRows;
+ }
+
public long getNumSegments()
{
return numSegments;
@@ -55,6 +67,11 @@ public class CompactionStatistics
public void increment(CompactionStatistics other)
{
totalBytes += other.getTotalBytes();
+ if (totalRows == null || other.totalRows == null) {
+ totalRows = null;
+ } else {
+ totalRows += other.totalRows;
+ }
numIntervals += other.getNumIntervals();
numSegments += other.getNumSegments();
}
@@ -62,6 +79,11 @@ public class CompactionStatistics
public void decrement(CompactionStatistics other)
{
totalBytes -= other.getTotalBytes();
+ if (totalRows == null || other.totalRows == null) {
+ totalRows = null;
+ } else {
+ totalRows -= other.totalRows;
+ }
numIntervals -= other.getNumIntervals();
numSegments -= other.getNumSegments();
}
@@ -71,6 +93,7 @@ public class CompactionStatistics
{
return "CompactionStatistics{" +
"totalBytes=" + totalBytes +
+ ", totalRows=" + totalRows +
", numSegments=" + numSegments +
", numIntervals=" + numIntervals +
'}';
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index cdf1f1832b3..06182e3a396 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -185,6 +185,14 @@ public class CompactionStatus
);
}
+ public static CompactionStatus complete(
+ CompactionStatistics compactionStatistics,
+ CompactionStatistics uncompactedStats
+ )
+ {
+ return new CompactionStatus(State.COMPLETE, null, compactionStatistics,
uncompactedStats, null);
+ }
+
/**
* Computes compaction status for the given field. The status is assumed to
be
* COMPLETE (i.e. no further compaction is required) if the configured value
@@ -439,7 +447,7 @@ public class CompactionStatus
}
if (reasonsForCompaction.isEmpty()) {
- return COMPLETE;
+ return CompactionStatus.complete(createStats(this.compactedSegments),
createStats(this.uncompactedSegments));
} else {
return CompactionStatus.pending(
createStats(this.compactedSegments),
@@ -483,9 +491,9 @@ public class CompactionStatus
// Cannot evaluate further without a fingerprint mapper
uncompactedSegments.addAll(
mismatchedFingerprintToSegmentMap.values()
- .stream()
- .flatMap(List::stream)
- .toList()
+ .stream()
+ .flatMap(List::stream)
+ .toList()
);
return CompactionStatus.pending("Segments have a mismatched
fingerprint and no fingerprint mapper is available");
}
@@ -509,7 +517,8 @@ public class CompactionStatus
}
segments.addAll(e.getValue());
return segments;
- });
+ }
+ );
}
}
@@ -598,7 +607,8 @@ public class CompactionStatus
} else if (existingPartionsSpec instanceof DynamicPartitionsSpec) {
existingPartionsSpec = new DynamicPartitionsSpec(
existingPartionsSpec.getMaxRowsPerSegment(),
- ((DynamicPartitionsSpec)
existingPartionsSpec).getMaxTotalRowsOr(Long.MAX_VALUE));
+ ((DynamicPartitionsSpec)
existingPartionsSpec).getMaxTotalRowsOr(Long.MAX_VALUE)
+ );
}
return CompactionStatus.completeIfNullOrEqual(
"partitionsSpec",
@@ -826,7 +836,13 @@ public class CompactionStatus
final Set<Interval> segmentIntervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
final long totalBytes =
segments.stream().mapToLong(DataSegment::getSize).sum();
- return CompactionStatistics.create(totalBytes, segments.size(),
segmentIntervals.size());
+ final Long totalRows;
+ if (segments.stream().allMatch(s -> s.getTotalRows() != null)) {
+ totalRows =
segments.stream().mapToLong(DataSegment::getTotalRows).sum();
+ } else {
+ totalRows = null;
+ }
+ return CompactionStatistics.create(totalBytes, totalRows,
segments.size(), segmentIntervals.size());
}
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
index 001525a2a89..b8b754e718c 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
@@ -25,6 +25,7 @@ import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.util.Comparator;
@@ -41,6 +42,7 @@ import java.util.Objects;
@UnstableApi
public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
{
+ private static final Logger logger = new
Logger(MostFragmentedIntervalFirstPolicy.class);
private static final HumanReadableBytes SIZE_2_GB = new
HumanReadableBytes("2GiB");
private static final HumanReadableBytes SIZE_10_MB = new
HumanReadableBytes("10MiB");
@@ -48,6 +50,7 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
private final HumanReadableBytes minUncompactedBytes;
private final HumanReadableBytes maxAverageUncompactedBytesPerSegment;
private final int minUncompactedBytesPercentForFullCompaction;
+ private final int minUncompactedRowsPercentForFullCompaction;
@JsonCreator
public MostFragmentedIntervalFirstPolicy(
@@ -57,6 +60,8 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
HumanReadableBytes maxAverageUncompactedBytesPerSegment,
@JsonProperty("minUncompactedBytesPercentForFullCompaction") @Nullable
Integer minUncompactedBytesPercentForFullCompaction,
+ @JsonProperty("minUncompactedRowsPercentForFullCompaction") @Nullable
+ Integer minUncompactedRowsPercentForFullCompaction,
@JsonProperty("priorityDatasource") @Nullable String priorityDatasource
)
{
@@ -79,6 +84,13 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
"'minUncompactedBytesPercentForFullCompaction'[%s] must be between 0
and 100",
minUncompactedBytesPercentForFullCompaction
);
+ InvalidInput.conditionalException(
+ minUncompactedRowsPercentForFullCompaction == null
+ || (minUncompactedRowsPercentForFullCompaction >= 0
+ && minUncompactedRowsPercentForFullCompaction < 100),
+ "'minUncompactedRowsPercentForFullCompaction'[%s] must be between 0
and 100",
+ minUncompactedRowsPercentForFullCompaction
+ );
this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount,
100);
this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes,
SIZE_10_MB);
@@ -86,6 +98,8 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
= Configs.valueOrDefault(maxAverageUncompactedBytesPerSegment,
SIZE_2_GB);
this.minUncompactedBytesPercentForFullCompaction =
Configs.valueOrDefault(minUncompactedBytesPercentForFullCompaction, 0);
+ this.minUncompactedRowsPercentForFullCompaction =
+ Configs.valueOrDefault(minUncompactedRowsPercentForFullCompaction, 0);
}
/**
@@ -129,6 +143,17 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
return minUncompactedBytesPercentForFullCompaction;
}
+ /**
+ * Threshold percentage of uncompacted rows to total rows below which
+ * minor compaction is eligible instead of full compaction.
+ * Default value is 0.
+ */
+ @JsonProperty
+ public int minUncompactedRowsPercentForFullCompaction()
+ {
+ return minUncompactedRowsPercentForFullCompaction;
+ }
+
@Override
protected Comparator<CompactionCandidate> getSegmentComparator()
{
@@ -148,7 +173,8 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
return minUncompactedCount == policy.minUncompactedCount
&& Objects.equals(minUncompactedBytes, policy.minUncompactedBytes)
&& Objects.equals(maxAverageUncompactedBytesPerSegment,
policy.maxAverageUncompactedBytesPerSegment)
- && minUncompactedBytesPercentForFullCompaction ==
policy.minUncompactedBytesPercentForFullCompaction;
+ && minUncompactedBytesPercentForFullCompaction ==
policy.minUncompactedBytesPercentForFullCompaction
+ && minUncompactedRowsPercentForFullCompaction ==
policy.minUncompactedRowsPercentForFullCompaction;
}
@Override
@@ -159,7 +185,8 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
minUncompactedCount,
minUncompactedBytes,
maxAverageUncompactedBytesPerSegment,
- minUncompactedBytesPercentForFullCompaction
+ minUncompactedBytesPercentForFullCompaction,
+ minUncompactedRowsPercentForFullCompaction
);
}
@@ -172,6 +199,7 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
", minUncompactedBytes=" + minUncompactedBytes +
", maxAverageUncompactedBytesPerSegment=" +
maxAverageUncompactedBytesPerSegment +
", minUncompactedBytesPercentForFullCompaction=" +
minUncompactedBytesPercentForFullCompaction +
+ ", minUncompactedRowsPercentForFullCompaction=" +
minUncompactedRowsPercentForFullCompaction +
", priorityDataSource='" + getPriorityDatasource() + '\'' +
'}';
}
@@ -223,9 +251,27 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
uncompactedBytesRatio,
minUncompactedBytesPercentForFullCompaction
);
- } else {
- return Eligibility.FULL;
}
+
+ // Check uncompacted rows ratio if total rows are available
+ final Long uncompactedRows = uncompacted.getTotalRows();
+ final Long compactedRows = candidate.getCompactedStats().getTotalRows();
+ if (uncompactedRows != null && compactedRows != null) {
+ if (uncompactedRows + compactedRows > 0) {
+ final double uncompactedRowsRatio = (double) uncompactedRows /
(uncompactedRows + compactedRows) * 100;
+ if (uncompactedRowsRatio < minUncompactedRowsPercentForFullCompaction)
{
+ return Eligibility.minor(
+ "Uncompacted rows ratio[%.2f] is below threshold[%d]",
+ uncompactedRowsRatio,
+ minUncompactedRowsPercentForFullCompaction
+ );
+ }
+ } else {
+ logger.error("Zero total rows in compaction candidate, something is
wrong");
+ }
+ }
+
+ return Eligibility.FULL;
}
/**
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
index 53e027a93d9..3f958756ed6 100644
---
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
@@ -40,7 +40,7 @@ public class MostFragmentedIntervalFirstPolicyTest
public void test_thresholdValues_ofDefaultPolicy()
{
final MostFragmentedIntervalFirstPolicy policy =
- new MostFragmentedIntervalFirstPolicy(null, null, null, null, null);
+ new MostFragmentedIntervalFirstPolicy(null, null, null, null, null,
null);
Assertions.assertEquals(100, policy.getMinUncompactedCount());
Assertions.assertEquals(new HumanReadableBytes("10MiB"),
policy.getMinUncompactedBytes());
Assertions.assertEquals(new HumanReadableBytes("2GiB"),
policy.getMaxAverageUncompactedBytesPerSegment());
@@ -56,6 +56,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
null,
+ null,
null
);
@@ -80,6 +81,7 @@ public class MostFragmentedIntervalFirstPolicyTest
minUncompactedBytes,
HumanReadableBytes.valueOf(10_000),
null,
+ null,
null
);
@@ -104,6 +106,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(100),
maxAvgSegmentSize,
null,
+ null,
null
);
@@ -127,6 +130,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
null,
+ null,
null
);
@@ -148,6 +152,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
null,
+ null,
null
);
@@ -169,6 +174,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
null,
+ null,
null
);
@@ -190,6 +196,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(100),
null,
+ null,
null
);
@@ -220,6 +227,7 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(2),
HumanReadableBytes.valueOf(3),
50,
+ 51,
"foo"
);
final DefaultObjectMapper mapper = new DefaultObjectMapper();
@@ -232,7 +240,7 @@ public class MostFragmentedIntervalFirstPolicyTest
public void test_serde_noFieldsSet() throws IOException
{
final MostFragmentedIntervalFirstPolicy policy =
- new MostFragmentedIntervalFirstPolicy(null, null, null, null, null);
+ new MostFragmentedIntervalFirstPolicy(null, null, null, null, null,
null);
final DefaultObjectMapper mapper = new DefaultObjectMapper();
final CompactionCandidateSearchPolicy policy2 =
mapper.readValue(mapper.writeValueAsString(policy),
CompactionCandidateSearchPolicy.class);
@@ -240,7 +248,7 @@ public class MostFragmentedIntervalFirstPolicyTest
}
@Test
- public void
test_compactionMode_returnsMinorCompactionMode_whenPercentageBelowThreshold()
+ public void
test_compactionMode_returnsMinorCompactionMode_whenBytePercentageBelowThreshold()
{
// Set threshold to 0.5 (50%)
final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
@@ -248,11 +256,12 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
50,
+ 50,
null
);
- final CompactionStatistics compacted = CompactionStatistics.create(1200L,
10, 1L);
- final CompactionStatistics uncompacted = CompactionStatistics.create(400L,
100, 1L);
+ final CompactionStatistics compacted = CompactionStatistics.create(1200L,
400L, 10, 1L);
+ final CompactionStatistics uncompacted = CompactionStatistics.create(400L,
600L, 100, 1L);
final CompactionStatus status = CompactionStatus.pending(compacted,
uncompacted, List.of(SEGMENT), "");
final CompactionCandidate candidate =
CompactionCandidate.from(List.of(SEGMENT), null, status);
@@ -264,7 +273,7 @@ public class MostFragmentedIntervalFirstPolicyTest
}
@Test
- public void
test_compactionMode_returnsFullCompaction_whenPercentageAboveThreshold()
+ public void
test_compactionMode_returnsFullCompaction_whenBytePercentageAboveThreshold()
{
// Set threshold to 0.5 (50%)
final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
@@ -272,13 +281,68 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
50,
+ 50,
null
);
final CompactionStatus status =
CompactionStatus.pending(
- CompactionStatistics.create(500L, 5, 1),
- CompactionStatistics.create(600L, 100, 1),
+ CompactionStatistics.create(500L, null, 5, 1),
+ CompactionStatistics.create(600L, null, 100, 1),
+ List.of(),
+ ""
+ );
+ final CompactionCandidate candidate =
CompactionCandidate.from(List.of(SEGMENT), null, status);
+ final Eligibility eligibility =
+ policy.checkEligibilityForCompaction(candidate, null);
+
+ Assertions.assertEquals(CompactionMode.ALL_SEGMENTS,
eligibility.getMode());
+ Assertions.assertTrue(eligibility.isEligible());
+ }
+
+ @Test
+ public void
test_compactionMode_returnsMinorCompaction_whenRowPercentageBelowThreshold()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(10_000),
+ 50,
+ 50,
+ null
+ );
+
+ final CompactionStatus status =
+ CompactionStatus.pending(
+ CompactionStatistics.create(500L, 1200L, 5, 1),
+ CompactionStatistics.create(600L, 400L, 100, 1),
+ List.of(),
+ ""
+ );
+ final CompactionCandidate candidate =
CompactionCandidate.from(List.of(SEGMENT), null, status);
+ final Eligibility eligibility =
+ policy.checkEligibilityForCompaction(candidate, null);
+
+ Assertions.assertEquals(CompactionMode.UNCOMPACTED_SEGMENTS_ONLY,
eligibility.getMode());
+ Assertions.assertTrue(eligibility.isEligible());
+ }
+
+ @Test
+ public void
test_compactionMode_returnsFullCompaction_whenByteAndRowPercentageBothAboveThreshold()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(10_000),
+ 50,
+ 50,
+ null
+ );
+
+ final CompactionStatus status =
+ CompactionStatus.pending(
+ CompactionStatistics.create(500L, 500L, 5, 1),
+ CompactionStatistics.create(600L, 600L, 100, 1),
List.of(),
""
);
@@ -299,14 +363,15 @@ public class MostFragmentedIntervalFirstPolicyTest
HumanReadableBytes.valueOf(1),
HumanReadableBytes.valueOf(10_000),
null,
+ null,
null
);
// With default threshold 0, any positive percentage >= 0, so always
ALL_SEGMENTS_ELIGIBLE
final CompactionStatus status =
CompactionStatus.pending(
- CompactionStatistics.create(1_000L, 10, 1),
- CompactionStatistics.create(100L, 100, 1),
+ CompactionStatistics.create(1_000L, 1_000L, 10, 1),
+ CompactionStatistics.create(100L, 100L, 100, 1),
List.of(),
""
);
@@ -320,9 +385,10 @@ public class MostFragmentedIntervalFirstPolicyTest
private CompactionCandidate createCandidate(int numSegments, long
avgSizeBytes)
{
- final CompactionStatistics dummyCompactedStats =
CompactionStatistics.create(1L, 1L, 1L);
+ final CompactionStatistics dummyCompactedStats =
CompactionStatistics.create(1L, null, 1L, 1L);
final CompactionStatistics uncompactedStats = CompactionStatistics.create(
avgSizeBytes * numSegments,
+ null,
numSegments,
1L
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
index a6eb127f854..83fbdaaef51 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
@@ -34,9 +34,9 @@ public class AutoCompactionSnapshotTest
// Increment every stat twice
for (int i = 0; i < 2; i++) {
- builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13));
- builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13));
- builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
+ builder.incrementSkippedStats(CompactionStatistics.create(13, null, 13,
13));
+ builder.incrementWaitingStats(CompactionStatistics.create(13, null, 13,
13));
+ builder.incrementCompactedStats(CompactionStatistics.create(13, null,
13, 13));
}
final AutoCompactionSnapshot actual =
builder.withMessage(expectedMessage).build();
diff --git a/website/.spelling b/website/.spelling
index 544e5953b3f..703046dc335 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -21,6 +21,7 @@
BrowserOnly
docusaurus
1M
+10MiB
100MiB
32-bit
4MiB
@@ -601,6 +602,7 @@ unannouncements
unary
unassign
uncomment
+uncompacted
underutilization
unintuitive
unioned
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]