This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f77596d42b feat: adds row-based compaction eligibility filtering  
(#19205)
5f77596d42b is described below

commit 5f77596d42bc38173dbff3a491d9a035094d9d6c
Author: Cece Mei <[email protected]>
AuthorDate: Wed Mar 25 11:53:28 2026 -0700

    feat: adds row-based compaction eligibility filtering  (#19205)
    
    * rows
    
    * document
    
    * zero
    
    * nullable
    
    * spell
---
 docs/api-reference/automatic-compaction-api.md     | 25 +++++-
 .../embedded/compact/CompactionSupervisorTest.java | 23 +++---
 .../embedded/indexing/KafkaClusterMetricsTest.java |  2 +-
 .../compact/OverlordCompactionSchedulerTest.java   |  2 +-
 .../server/compaction/CompactionCandidate.java     | 35 ++++++---
 .../server/compaction/CompactionRunSimulator.java  |  2 +-
 .../server/compaction/CompactionStatistics.java    | 25 +++++-
 .../druid/server/compaction/CompactionStatus.java  | 30 ++++++--
 .../MostFragmentedIntervalFirstPolicy.java         | 54 ++++++++++++-
 .../MostFragmentedIntervalFirstPolicyTest.java     | 88 +++++++++++++++++++---
 .../coordinator/AutoCompactionSnapshotTest.java    |  6 +-
 website/.spelling                                  |  2 +
 12 files changed, 243 insertions(+), 51 deletions(-)

diff --git a/docs/api-reference/automatic-compaction-api.md 
b/docs/api-reference/automatic-compaction-api.md
index 6864aae4735..4f2ce3d62af 100644
--- a/docs/api-reference/automatic-compaction-api.md
+++ b/docs/api-reference/automatic-compaction-api.md
@@ -886,7 +886,7 @@ This includes the following fields:
 |------|-----------|-------------|
 |`compactionTaskSlotRatio`|Ratio of number of slots taken up by compaction 
tasks to the number of total task slots across all workers.|0.1|
 |`maxCompactionTaskSlots`|Maximum number of task slots that can be taken up by 
compaction tasks and sub-tasks. Minimum number of task slots available for 
compaction is 1. When using MSQ engine or Native engine with range 
partitioning, a single compaction job occupies more than one task slot. In this 
case, the minimum is 2 so that at least one compaction job can always run in 
the cluster.|2147483647 (i.e. total task slots)|
-|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the 
only supported policy is [Newest segment 
first](#compaction-policy-newestsegmentfirst).|Newest segment first|
+|`compactionPolicy`|Policy to choose intervals for compaction. Supported 
policies are [Newest segment first](#compaction-policy-newestsegmentfirst), 
[Most fragmented first](#compaction-policy-mostfragmentedfirst), and [Fixed 
interval order](#compaction-policy-fixedintervalorder).|Newest segment first|
 |`useSupervisors`|Whether compaction should be run on Overlord using 
supervisors instead of Coordinator duties.|false|
 |`engine`|Engine used for running compaction tasks, unless overridden in the 
datasource-level compaction config. Possible values are `native` and `msq`. 
`msq` engine can be used for compaction only if `useSupervisors` is 
`true`.|`native`|
 |`storeCompactionStatePerSegment`|**This configuration only takes effect if 
`useSupervisors` is `true`.** Whether to persist the full compaction state in 
segment metadata. When `true` (default), compaction state is stored in both the 
segment metadata and the indexing states table. This is historically how Druid 
has worked. When `false`, only a fingerprint reference is stored in the segment 
metadata, reducing storage overhead in the segments table. The actual 
compaction state is stored in [...]
@@ -898,6 +898,29 @@ This includes the following fields:
 |`type`|This must always be `newestSegmentFirst`||
 |`priorityDatasource`|Datasource to prioritize for compaction. The intervals 
of this datasource are chosen for compaction before the intervals of any other 
datasource. Within this datasource, the intervals are prioritized based on the 
chosen compaction policy.|None|
 
+#### Compaction policy `mostFragmentedFirst`
+
+This experimental policy prioritizes compaction of intervals with the largest 
number of small uncompacted segments. It favors cluster stability by reducing 
segment count over performance of queries on newer intervals.
+
+|Field|Description|Default value|
+|-----|-----------|-------------|
+|`type`|This must always be `mostFragmentedFirst`||
+|`priorityDatasource`|Datasource to prioritize for compaction. The intervals 
of this datasource are chosen for compaction before the intervals of any other 
datasource. Within this datasource, the intervals are prioritized based on the 
chosen compaction policy.|None|
+|`minUncompactedCount`|Minimum number of uncompacted segments that must be 
present in an interval to make it eligible for compaction. Must be greater than 
0.|100|
+|`minUncompactedBytes`|Minimum total bytes of uncompacted segments that must 
be present in an interval to make it eligible for compaction. Human-readable 
byte format (e.g., "10MiB").|10 MiB|
+|`maxAverageUncompactedBytesPerSegment`|Maximum average size of uncompacted 
segments in an interval eligible for compaction. Human-readable byte format 
(e.g., "2GiB").|2 GiB|
+|`minUncompactedBytesPercentForFullCompaction`|Threshold percentage (0-100) of 
uncompacted bytes to total bytes below which minor compaction is eligible 
instead of full compaction.|0|
+|`minUncompactedRowsPercentForFullCompaction`|Threshold percentage (0-100) of 
uncompacted rows to total rows below which minor compaction is eligible instead 
of full compaction.|0|
+
+#### Compaction policy `fixedIntervalOrder`
+
+This policy specifies the datasources and intervals eligible for compaction 
and their order. It is primarily used for integration tests.
+
+|Field|Description|Default value|
+|-----|-----------|-------------|
+|`type`|This must always be `fixedIntervalOrder`||
+|`eligibleCandidates`|List of datasource-interval pairs eligible for 
compaction. Each entry contains `datasource` (string) and `interval` (ISO-8601 
interval) fields. Compaction processes candidates in the order specified.|None|
+
 
 #### URL
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index ddcf41f23db..0192b4853ed 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -220,14 +220,11 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
-  @MethodSource("getPartitionsSpec")
+  @MethodSource("getPolicyAndPartition")
   @ParameterizedTest(name = "partitionsSpec={0}")
-  public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) 
throws Exception
+  public void test_minorCompactionWithMSQ(MostFragmentedIntervalFirstPolicy 
policy, PartitionsSpec partitionsSpec)
   {
-    configureCompaction(
-        CompactionEngine.MSQ,
-        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
-    );
+    configureCompaction(CompactionEngine.MSQ, policy);
 
     ingest1kRecords();
     ingest1kRecords();
@@ -1026,11 +1023,19 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     cluster.callApi().runTask(task, overlord);
   }
 
-  public static List<PartitionsSpec> getPartitionsSpec()
+  public static List<Object[]> getPolicyAndPartition()
   {
     return List.of(
-        new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false),
-        new DynamicPartitionsSpec(null, null)
+        new Object[]{
+            // decides minor compaction based on bytes percent
+            new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null, null),
+            new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), 
false)
+        },
+        new Object[]{
+            // decides minor compaction based on rows percent
+            new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, null, 51, null),
+            new DynamicPartitionsSpec(null, null)
+        }
     );
   }
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index c2f07d12495..39742a9d578 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -147,7 +147,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         ),
         Arguments.of(
             CompactionEngine.MSQ,
-            new MostFragmentedIntervalFirstPolicy(1, 
HumanReadableBytes.valueOf(1), null, 80, null)
+            new MostFragmentedIntervalFirstPolicy(1, 
HumanReadableBytes.valueOf(1), null, null, 80, null)
         )
     );
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index 5c598024c15..5c1200110f8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -394,7 +394,7 @@ public class OverlordCompactionSchedulerTest
     runCompactionTasks(1);
 
     final AutoCompactionSnapshot.Builder expectedSnapshot = 
AutoCompactionSnapshot.builder(dataSource);
-    
expectedSnapshot.incrementWaitingStats(CompactionStatistics.create(100_000_000, 
1, 1));
+    
expectedSnapshot.incrementWaitingStats(CompactionStatistics.create(100_000_000, 
null, 1, 1));
 
     Assert.assertEquals(
         expectedSnapshot.build(),
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
index d3653ccd992..653bc353f24 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
@@ -42,9 +42,8 @@ public class CompactionCandidate
   private final Interval umbrellaInterval;
   private final Interval compactionInterval;
   private final String dataSource;
-  private final long totalBytes;
-  private final int numIntervals;
 
+  private final CompactionStatistics compactionStatistics;
   private final CompactionStatus currentStatus;
 
   public static Interval getCompactionInterval(
@@ -99,12 +98,23 @@ public class CompactionCandidate
   )
   {
     this.segments = segments;
-    this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum();
+
+    final Long totalRows;
+    if (segments.stream().allMatch(s -> s.getTotalRows() != null)) {
+      totalRows = segments.stream().mapToLong(DataSegment::getTotalRows).sum();
+    } else {
+      totalRows = null;
+    }
+    this.compactionStatistics = CompactionStatistics.create(
+        segments.stream().mapToLong(DataSegment::getSize).sum(),
+        totalRows,
+        segments.size(),
+        numDistinctSegmentIntervals
+    );
 
     this.umbrellaInterval = umbrellaInterval;
     this.compactionInterval = compactionInterval;
 
-    this.numIntervals = numDistinctSegmentIntervals;
     this.dataSource = segments.get(0).getDataSource();
     this.currentStatus = currentStatus;
   }
@@ -117,11 +127,6 @@ public class CompactionCandidate
     return segments;
   }
 
-  public long getTotalBytes()
-  {
-    return totalBytes;
-  }
-
   public int numSegments()
   {
     return segments.size();
@@ -152,7 +157,7 @@ public class CompactionCandidate
 
   public CompactionStatistics getStats()
   {
-    return CompactionStatistics.create(totalBytes, numSegments(), 
numIntervals);
+    return compactionStatistics;
   }
 
   @Nullable
@@ -187,7 +192,13 @@ public class CompactionCandidate
    */
   public CompactionCandidate withCurrentStatus(CompactionStatus status)
   {
-    return new CompactionCandidate(segments, umbrellaInterval, 
compactionInterval, numIntervals, status);
+    return new CompactionCandidate(
+        segments,
+        umbrellaInterval,
+        compactionInterval,
+        Math.toIntExact(compactionStatistics.getNumIntervals()),
+        status
+    );
   }
 
   @Override
@@ -196,7 +207,7 @@ public class CompactionCandidate
     return "SegmentsToCompact{" +
            "datasource=" + dataSource +
            ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
-           ", totalSize=" + totalBytes +
+           ", compactionStatistics=" + compactionStatistics +
            ", currentStatus=" + currentStatus +
            '}';
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
index 586f16c3b23..b01d892c3c0 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
@@ -182,7 +182,7 @@ public class CompactionRunSimulator
     row.add(candidate.getDataSource());
     row.add(candidate.getCompactionInterval());
     row.add(candidate.numSegments());
-    row.add(candidate.getTotalBytes());
+    row.add(candidate.getStats().getTotalBytes());
     
row.add(CompactionSlotManager.getMaxTaskSlotsForNativeCompactionTask(tuningConfig));
     if (reason != null) {
       row.add(reason);
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
index 7d43a09aed8..d03eee00eb7 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
@@ -19,19 +19,25 @@
 
 package org.apache.druid.server.compaction;
 
+import javax.annotation.Nullable;
+
 /**
  * Used to track statistics for segments in different states of compaction.
+ * totalRows can be null for old segments where row count was not stored.
  */
 public class CompactionStatistics
 {
   private long totalBytes;
+  @Nullable
+  private Long totalRows;
   private long numSegments;
   private long numIntervals;
 
-  public static CompactionStatistics create(long bytes, long numSegments, long 
numIntervals)
+  public static CompactionStatistics create(long bytes, Long totalRows, long 
numSegments, long numIntervals)
   {
     final CompactionStatistics stats = new CompactionStatistics();
     stats.totalBytes = bytes;
+    stats.totalRows = totalRows;
     stats.numIntervals = numIntervals;
     stats.numSegments = numSegments;
     return stats;
@@ -42,6 +48,12 @@ public class CompactionStatistics
     return totalBytes;
   }
 
+  @Nullable
+  public Long getTotalRows()
+  {
+    return totalRows;
+  }
+
   public long getNumSegments()
   {
     return numSegments;
@@ -55,6 +67,11 @@ public class CompactionStatistics
   public void increment(CompactionStatistics other)
   {
     totalBytes += other.getTotalBytes();
+    if (totalRows == null || other.totalRows == null) {
+      totalRows = null;
+    } else {
+      totalRows += other.totalRows;
+    }
     numIntervals += other.getNumIntervals();
     numSegments += other.getNumSegments();
   }
@@ -62,6 +79,11 @@ public class CompactionStatistics
   public void decrement(CompactionStatistics other)
   {
     totalBytes -= other.getTotalBytes();
+    if (totalRows == null || other.totalRows == null) {
+      totalRows = null;
+    } else {
+      totalRows -= other.totalRows;
+    }
     numIntervals -= other.getNumIntervals();
     numSegments -= other.getNumSegments();
   }
@@ -71,6 +93,7 @@ public class CompactionStatistics
   {
     return "CompactionStatistics{" +
            "totalBytes=" + totalBytes +
+           ", totalRows=" + totalRows +
            ", numSegments=" + numSegments +
            ", numIntervals=" + numIntervals +
            '}';
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index cdf1f1832b3..06182e3a396 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -185,6 +185,14 @@ public class CompactionStatus
     );
   }
 
+  public static CompactionStatus complete(
+      CompactionStatistics compactionStatistics,
+      CompactionStatistics uncompactedStats
+  )
+  {
+    return new CompactionStatus(State.COMPLETE, null, compactionStatistics, 
uncompactedStats, null);
+  }
+
   /**
    * Computes compaction status for the given field. The status is assumed to 
be
    * COMPLETE (i.e. no further compaction is required) if the configured value
@@ -439,7 +447,7 @@ public class CompactionStatus
       }
 
       if (reasonsForCompaction.isEmpty()) {
-        return COMPLETE;
+        return CompactionStatus.complete(createStats(this.compactedSegments), 
createStats(this.uncompactedSegments));
       } else {
         return CompactionStatus.pending(
             createStats(this.compactedSegments),
@@ -483,9 +491,9 @@ public class CompactionStatus
         // Cannot evaluate further without a fingerprint mapper
         uncompactedSegments.addAll(
             mismatchedFingerprintToSegmentMap.values()
-                                            .stream()
-                                            .flatMap(List::stream)
-                                            .toList()
+                                             .stream()
+                                             .flatMap(List::stream)
+                                             .toList()
         );
         return CompactionStatus.pending("Segments have a mismatched 
fingerprint and no fingerprint mapper is available");
       }
@@ -509,7 +517,8 @@ public class CompactionStatus
                 }
                 segments.addAll(e.getValue());
                 return segments;
-              });
+              }
+          );
         }
       }
 
@@ -598,7 +607,8 @@ public class CompactionStatus
       } else if (existingPartionsSpec instanceof DynamicPartitionsSpec) {
         existingPartionsSpec = new DynamicPartitionsSpec(
             existingPartionsSpec.getMaxRowsPerSegment(),
-            ((DynamicPartitionsSpec) 
existingPartionsSpec).getMaxTotalRowsOr(Long.MAX_VALUE));
+            ((DynamicPartitionsSpec) 
existingPartionsSpec).getMaxTotalRowsOr(Long.MAX_VALUE)
+        );
       }
       return CompactionStatus.completeIfNullOrEqual(
           "partitionsSpec",
@@ -826,7 +836,13 @@ public class CompactionStatus
       final Set<Interval> segmentIntervals =
           
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
       final long totalBytes = 
segments.stream().mapToLong(DataSegment::getSize).sum();
-      return CompactionStatistics.create(totalBytes, segments.size(), 
segmentIntervals.size());
+      final Long totalRows;
+      if (segments.stream().allMatch(s -> s.getTotalRows() != null)) {
+        totalRows = 
segments.stream().mapToLong(DataSegment::getTotalRows).sum();
+      } else {
+        totalRows = null;
+      }
+      return CompactionStatistics.create(totalBytes, totalRows, 
segments.size(), segmentIntervals.size());
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
 
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
index 001525a2a89..b8b754e718c 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
@@ -25,6 +25,7 @@ import org.apache.druid.common.config.Configs;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.annotations.UnstableApi;
 import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.logger.Logger;
 
 import javax.annotation.Nullable;
 import java.util.Comparator;
@@ -41,6 +42,7 @@ import java.util.Objects;
 @UnstableApi
 public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
 {
+  private static final Logger logger = new 
Logger(MostFragmentedIntervalFirstPolicy.class);
   private static final HumanReadableBytes SIZE_2_GB = new 
HumanReadableBytes("2GiB");
   private static final HumanReadableBytes SIZE_10_MB = new 
HumanReadableBytes("10MiB");
 
@@ -48,6 +50,7 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
   private final HumanReadableBytes minUncompactedBytes;
   private final HumanReadableBytes maxAverageUncompactedBytesPerSegment;
   private final int minUncompactedBytesPercentForFullCompaction;
+  private final int minUncompactedRowsPercentForFullCompaction;
 
   @JsonCreator
   public MostFragmentedIntervalFirstPolicy(
@@ -57,6 +60,8 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
       HumanReadableBytes maxAverageUncompactedBytesPerSegment,
       @JsonProperty("minUncompactedBytesPercentForFullCompaction") @Nullable
       Integer minUncompactedBytesPercentForFullCompaction,
+      @JsonProperty("minUncompactedRowsPercentForFullCompaction") @Nullable
+      Integer minUncompactedRowsPercentForFullCompaction,
       @JsonProperty("priorityDatasource") @Nullable String priorityDatasource
   )
   {
@@ -79,6 +84,13 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
         "'minUncompactedBytesPercentForFullCompaction'[%s] must be between 0 
and 100",
         minUncompactedBytesPercentForFullCompaction
     );
+    InvalidInput.conditionalException(
+        minUncompactedRowsPercentForFullCompaction == null
+        || (minUncompactedRowsPercentForFullCompaction >= 0
+            && minUncompactedRowsPercentForFullCompaction < 100),
+        "'minUncompactedRowsPercentForFullCompaction'[%s] must be between 0 
and 100",
+        minUncompactedRowsPercentForFullCompaction
+    );
 
     this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount, 
100);
     this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes, 
SIZE_10_MB);
@@ -86,6 +98,8 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
         = Configs.valueOrDefault(maxAverageUncompactedBytesPerSegment, 
SIZE_2_GB);
     this.minUncompactedBytesPercentForFullCompaction =
         Configs.valueOrDefault(minUncompactedBytesPercentForFullCompaction, 0);
+    this.minUncompactedRowsPercentForFullCompaction =
+        Configs.valueOrDefault(minUncompactedRowsPercentForFullCompaction, 0);
   }
 
   /**
@@ -129,6 +143,17 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
     return minUncompactedBytesPercentForFullCompaction;
   }
 
+  /**
+   * Threshold percentage of uncompacted rows to total rows below which
+   * minor compaction is eligible instead of full compaction.
+   * Default value is 0.
+   */
+  @JsonProperty
+  public int minUncompactedRowsPercentForFullCompaction()
+  {
+    return minUncompactedRowsPercentForFullCompaction;
+  }
+
   @Override
   protected Comparator<CompactionCandidate> getSegmentComparator()
   {
@@ -148,7 +173,8 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
     return minUncompactedCount == policy.minUncompactedCount
            && Objects.equals(minUncompactedBytes, policy.minUncompactedBytes)
            && Objects.equals(maxAverageUncompactedBytesPerSegment, 
policy.maxAverageUncompactedBytesPerSegment)
-           && minUncompactedBytesPercentForFullCompaction == 
policy.minUncompactedBytesPercentForFullCompaction;
+           && minUncompactedBytesPercentForFullCompaction == 
policy.minUncompactedBytesPercentForFullCompaction
+           && minUncompactedRowsPercentForFullCompaction == 
policy.minUncompactedRowsPercentForFullCompaction;
   }
 
   @Override
@@ -159,7 +185,8 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
         minUncompactedCount,
         minUncompactedBytes,
         maxAverageUncompactedBytesPerSegment,
-        minUncompactedBytesPercentForFullCompaction
+        minUncompactedBytesPercentForFullCompaction,
+        minUncompactedRowsPercentForFullCompaction
     );
   }
 
@@ -172,6 +199,7 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
         ", minUncompactedBytes=" + minUncompactedBytes +
         ", maxAverageUncompactedBytesPerSegment=" + 
maxAverageUncompactedBytesPerSegment +
         ", minUncompactedBytesPercentForFullCompaction=" + 
minUncompactedBytesPercentForFullCompaction +
+        ", minUncompactedRowsPercentForFullCompaction=" + 
minUncompactedRowsPercentForFullCompaction +
         ", priorityDataSource='" + getPriorityDatasource() + '\'' +
         '}';
   }
@@ -223,9 +251,27 @@ public class MostFragmentedIntervalFirstPolicy extends 
BaseCandidateSearchPolicy
           uncompactedBytesRatio,
           minUncompactedBytesPercentForFullCompaction
       );
-    } else {
-      return Eligibility.FULL;
     }
+
+    // Check uncompacted rows ratio if total rows are available
+    final Long uncompactedRows = uncompacted.getTotalRows();
+    final Long compactedRows = candidate.getCompactedStats().getTotalRows();
+    if (uncompactedRows != null && compactedRows != null) {
+      if (uncompactedRows + compactedRows > 0) {
+        final double uncompactedRowsRatio = (double) uncompactedRows / 
(uncompactedRows + compactedRows) * 100;
+        if (uncompactedRowsRatio < minUncompactedRowsPercentForFullCompaction) 
{
+          return Eligibility.minor(
+              "Uncompacted rows ratio[%.2f] is below threshold[%d]",
+              uncompactedRowsRatio,
+              minUncompactedRowsPercentForFullCompaction
+          );
+        }
+      } else {
+        logger.error("Zero total rows in compaction candidate, something is 
wrong");
+      }
+    }
+
+    return Eligibility.FULL;
   }
 
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
index 53e027a93d9..3f958756ed6 100644
--- 
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
@@ -40,7 +40,7 @@ public class MostFragmentedIntervalFirstPolicyTest
   public void test_thresholdValues_ofDefaultPolicy()
   {
     final MostFragmentedIntervalFirstPolicy policy =
-        new MostFragmentedIntervalFirstPolicy(null, null, null, null, null);
+        new MostFragmentedIntervalFirstPolicy(null, null, null, null, null, 
null);
     Assertions.assertEquals(100, policy.getMinUncompactedCount());
     Assertions.assertEquals(new HumanReadableBytes("10MiB"), 
policy.getMinUncompactedBytes());
     Assertions.assertEquals(new HumanReadableBytes("2GiB"), 
policy.getMaxAverageUncompactedBytesPerSegment());
@@ -56,6 +56,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         null,
+        null,
         null
     );
 
@@ -80,6 +81,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         minUncompactedBytes,
         HumanReadableBytes.valueOf(10_000),
         null,
+        null,
         null
     );
 
@@ -104,6 +106,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(100),
         maxAvgSegmentSize,
         null,
+        null,
         null
     );
 
@@ -127,6 +130,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         null,
+        null,
         null
     );
 
@@ -148,6 +152,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         null,
+        null,
         null
     );
 
@@ -169,6 +174,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         null,
+        null,
         null
     );
 
@@ -190,6 +196,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(100),
         null,
+        null,
         null
     );
 
@@ -220,6 +227,7 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(2),
         HumanReadableBytes.valueOf(3),
         50,
+        51,
         "foo"
     );
     final DefaultObjectMapper mapper = new DefaultObjectMapper();
@@ -232,7 +240,7 @@ public class MostFragmentedIntervalFirstPolicyTest
   public void test_serde_noFieldsSet() throws IOException
   {
     final MostFragmentedIntervalFirstPolicy policy =
-        new MostFragmentedIntervalFirstPolicy(null, null, null, null, null);
+        new MostFragmentedIntervalFirstPolicy(null, null, null, null, null, 
null);
     final DefaultObjectMapper mapper = new DefaultObjectMapper();
     final CompactionCandidateSearchPolicy policy2 =
         mapper.readValue(mapper.writeValueAsString(policy), 
CompactionCandidateSearchPolicy.class);
@@ -240,7 +248,7 @@ public class MostFragmentedIntervalFirstPolicyTest
   }
 
   @Test
-  public void 
test_compactionMode_returnsMinorCompactionMode_whenPercentageBelowThreshold()
+  public void 
test_compactionMode_returnsMinorCompactionMode_whenBytePercentageBelowThreshold()
   {
     // Set threshold to 0.5 (50%)
     final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
@@ -248,11 +256,12 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         50,
+        50,
         null
     );
 
-    final CompactionStatistics compacted = CompactionStatistics.create(1200L, 
10, 1L);
-    final CompactionStatistics uncompacted = CompactionStatistics.create(400L, 
100, 1L);
+    final CompactionStatistics compacted = CompactionStatistics.create(1200L, 
400L, 10, 1L);
+    final CompactionStatistics uncompacted = CompactionStatistics.create(400L, 
600L, 100, 1L);
     final CompactionStatus status = CompactionStatus.pending(compacted, 
uncompacted, List.of(SEGMENT), "");
 
     final CompactionCandidate candidate = 
CompactionCandidate.from(List.of(SEGMENT), null, status);
@@ -264,7 +273,7 @@ public class MostFragmentedIntervalFirstPolicyTest
   }
 
   @Test
-  public void 
test_compactionMode_returnsFullCompaction_whenPercentageAboveThreshold()
+  public void 
test_compactionMode_returnsFullCompaction_whenBytePercentageAboveThreshold()
   {
     // Set threshold to 0.5 (50%)
     final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
@@ -272,13 +281,68 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         50,
+        50,
         null
     );
 
     final CompactionStatus status =
         CompactionStatus.pending(
-            CompactionStatistics.create(500L, 5, 1),
-            CompactionStatistics.create(600L, 100, 1),
+            CompactionStatistics.create(500L, null, 5, 1),
+            CompactionStatistics.create(600L, null, 100, 1),
+            List.of(),
+            ""
+        );
+    final CompactionCandidate candidate = 
CompactionCandidate.from(List.of(SEGMENT), null, status);
+    final Eligibility eligibility =
+        policy.checkEligibilityForCompaction(candidate, null);
+
+    Assertions.assertEquals(CompactionMode.ALL_SEGMENTS, 
eligibility.getMode());
+    Assertions.assertTrue(eligibility.isEligible());
+  }
+
+  @Test
+  public void 
test_compactionMode_returnsMinorCompaction_whenRowPercentageBelowThreshold()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(10_000),
+        50,
+        50,
+        null
+    );
+
+    final CompactionStatus status =
+        CompactionStatus.pending(
+            CompactionStatistics.create(500L, 1200L, 5, 1),
+            CompactionStatistics.create(600L, 400L, 100, 1),
+            List.of(),
+            ""
+        );
+    final CompactionCandidate candidate = 
CompactionCandidate.from(List.of(SEGMENT), null, status);
+    final Eligibility eligibility =
+        policy.checkEligibilityForCompaction(candidate, null);
+
+    Assertions.assertEquals(CompactionMode.UNCOMPACTED_SEGMENTS_ONLY, 
eligibility.getMode());
+    Assertions.assertTrue(eligibility.isEligible());
+  }
+
+  @Test
+  public void 
test_compactionMode_returnsFullCompaction_whenByteAndRowPercentageBothAboveThreshold()
+  {
+    final MostFragmentedIntervalFirstPolicy policy = new 
MostFragmentedIntervalFirstPolicy(
+        1,
+        HumanReadableBytes.valueOf(1),
+        HumanReadableBytes.valueOf(10_000),
+        50,
+        50,
+        null
+    );
+
+    final CompactionStatus status =
+        CompactionStatus.pending(
+            CompactionStatistics.create(500L, 500L, 5, 1),
+            CompactionStatistics.create(600L, 600L, 100, 1),
             List.of(),
             ""
         );
@@ -299,14 +363,15 @@ public class MostFragmentedIntervalFirstPolicyTest
         HumanReadableBytes.valueOf(1),
         HumanReadableBytes.valueOf(10_000),
         null,
+        null,
         null
     );
 
     // With default threshold 0, any positive percentage >= 0, so always 
ALL_SEGMENTS_ELIGIBLE
     final CompactionStatus status =
         CompactionStatus.pending(
-            CompactionStatistics.create(1_000L, 10, 1),
-            CompactionStatistics.create(100L, 100, 1),
+            CompactionStatistics.create(1_000L, 1_000L, 10, 1),
+            CompactionStatistics.create(100L, 100L, 100, 1),
             List.of(),
             ""
         );
@@ -320,9 +385,10 @@ public class MostFragmentedIntervalFirstPolicyTest
 
   private CompactionCandidate createCandidate(int numSegments, long 
avgSizeBytes)
   {
-    final CompactionStatistics dummyCompactedStats = 
CompactionStatistics.create(1L, 1L, 1L);
+    final CompactionStatistics dummyCompactedStats = 
CompactionStatistics.create(1L, null, 1L, 1L);
     final CompactionStatistics uncompactedStats = CompactionStatistics.create(
         avgSizeBytes * numSegments,
+        null,
         numSegments,
         1L
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
index a6eb127f854..83fbdaaef51 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
@@ -34,9 +34,9 @@ public class AutoCompactionSnapshotTest
 
     // Increment every stat twice
     for (int i = 0; i < 2; i++) {
-      builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13));
-      builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13));
-      builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
+      builder.incrementSkippedStats(CompactionStatistics.create(13, null, 13, 
13));
+      builder.incrementWaitingStats(CompactionStatistics.create(13, null, 13, 
13));
+      builder.incrementCompactedStats(CompactionStatistics.create(13, null, 
13, 13));
     }
 
     final AutoCompactionSnapshot actual = 
builder.withMessage(expectedMessage).build();
diff --git a/website/.spelling b/website/.spelling
index 544e5953b3f..703046dc335 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -21,6 +21,7 @@
 BrowserOnly
 docusaurus
 1M
+10MiB
 100MiB
 32-bit
 4MiB
@@ -601,6 +602,7 @@ unannouncements
 unary
 unassign
 uncomment
+uncompacted
 underutilization
 unintuitive
 unioned


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to