This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fa9914ba058 Fix bug in coordinator-based compaction duty and simulator 
(#18812)
fa9914ba058 is described below

commit fa9914ba0585ba20c636f599355676b113142f3e
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Dec 5 12:29:02 2025 +0530

    Fix bug in coordinator-based compaction duty and simulator (#18812)
    
    Changes:
    - Fix bug in `CompactionRunSimulator` so that it honors the compaction 
policy being verified
    - Fix bug in `CompactSegments` so that the duty does not launch tasks for 
an interval that has
    already been skipped by the policy
      - This bug does not occur in Overlord-based compaction supervisors
      - It does not affect any real scenarios currently since the only existing 
production policy
      `newestSegmentFirst` never skips intervals anyway.
---
 .../compact/CompactionConfigBasedJobTemplate.java  |  2 +
 .../server/compaction/CompactionRunSimulator.java  | 11 +++-
 .../server/coordinator/duty/CompactSegments.java   |  2 +
 .../compaction/CompactionRunSimulatorTest.java     | 77 +++++++++++++++++++++-
 4 files changed, 88 insertions(+), 4 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
index a5db148372d..6b984a4b6c0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
@@ -118,6 +118,8 @@ public class CompactionConfigBasedJobTemplate implements 
CompactionJobTemplate
         config,
         timeline,
         Intervals.complementOf(searchInterval),
+        // This policy is used only while creating jobs
+        // The actual order of jobs is determined by the policy used in 
CompactionJobQueue
         new NewestSegmentFirstPolicy(null)
     );
 
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
index 781019e12f7..58fe28e6ea9 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
@@ -132,13 +132,18 @@ public class CompactionRunSimulator
     };
 
     // Unlimited task slots to ensure that simulator does not skip any interval
-    final DruidCompactionConfig configWithUnlimitedTaskSlots = 
compactionConfig.withClusterConfig(
-        new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null)
+    final ClusterCompactionConfig clusterConfig = 
compactionConfig.clusterConfig();
+    final ClusterCompactionConfig configWithUnlimitedTaskSlots = new 
ClusterCompactionConfig(
+        1.0,
+        Integer.MAX_VALUE,
+        clusterConfig.getCompactionPolicy(),
+        clusterConfig.isUseSupervisors(),
+        clusterConfig.getEngine()
     );
 
     final CoordinatorRunStats stats = new CoordinatorRunStats();
     new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
-        configWithUnlimitedTaskSlots,
+        compactionConfig.withClusterConfig(configWithUnlimitedTaskSlots),
         dataSourcesSnapshot,
         defaultEngine,
         stats
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 1dca23090c2..9947e521f65 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -245,8 +245,10 @@ public class CompactSegments implements 
CoordinatorCustomDuty
 
       if (compactionStatus.isComplete()) {
         snapshotBuilder.addToComplete(candidatesWithStatus);
+        continue;
       } else if (compactionStatus.isSkipped()) {
         snapshotBuilder.addToSkipped(candidatesWithStatus);
+        continue;
       } else {
         // As these segments will be compacted, we will aggregate the 
statistic to the Compacted statistics
         snapshotBuilder.addToComplete(entry);
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
index bab2e351df1..34b61144e2b 100644
--- 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
@@ -31,6 +31,8 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.DruidCompactionConfig;
 import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -111,7 +113,80 @@ public class CompactionRunSimulatorTest
     );
     Assert.assertEquals(
         Collections.singletonList(
-            Arrays.asList("wiki", Intervals.of("2013-01-10/P1D"), 10, 
1_000_000_000L, 1, "skip offset from latest[P1D]")
+            List.of("wiki", Intervals.of("2013-01-10/P1D"), 10, 
1_000_000_000L, 1, "skip offset from latest[P1D]")
+        ),
+        skippedTable.getRows()
+    );
+  }
+
+  @Test
+  public void testSimulate_withFixedIntervalOrderPolicy()
+  {
+    final TestSegmentsMetadataManager segmentsMetadataManager = new 
TestSegmentsMetadataManager();
+
+    // Add some segments to the timeline
+    final String dataSource = TestDataSource.WIKI;
+    final List<DataSegment> wikiSegments
+        = CreateDataSegments.ofDatasource(dataSource)
+                            .forIntervals(10, Granularities.DAY)
+                            .withNumPartitions(10)
+                            .startingAt("2013-01-01")
+                            .eachOfSizeInMb(100);
+    wikiSegments.forEach(segmentsMetadataManager::addSegment);
+
+    final FixedIntervalOrderPolicy policy = new FixedIntervalOrderPolicy(
+        List.of(
+            new FixedIntervalOrderPolicy.Candidate(dataSource, 
Intervals.of("2013-01-08/P1D")),
+            new FixedIntervalOrderPolicy.Candidate(dataSource, 
Intervals.of("2013-01-04/P1D"))
+        )
+    );
+    final CompactionSimulateResult simulateResult = 
simulator.simulateRunWithConfig(
+        DruidCompactionConfig
+            .empty()
+            .withClusterConfig(new ClusterCompactionConfig(null, null, policy, 
null, null))
+            .withDatasourceConfig(
+                
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(dataSource).build()
+            ),
+        segmentsMetadataManager.getRecentDataSourcesSnapshot(),
+        CompactionEngine.NATIVE
+    );
+
+    Assert.assertNotNull(simulateResult);
+
+    final Map<CompactionStatus.State, Table> compactionStates = 
simulateResult.getCompactionStates();
+    Assert.assertNotNull(compactionStates);
+
+    Assert.assertNull(compactionStates.get(CompactionStatus.State.COMPLETE));
+    Assert.assertNull(compactionStates.get(CompactionStatus.State.RUNNING));
+
+    final Table pendingTable = 
compactionStates.get(CompactionStatus.State.PENDING);
+    Assert.assertEquals(
+        List.of("dataSource", "interval", "numSegments", "bytes", 
"maxTaskSlots", "reasonToCompact"),
+        pendingTable.getColumnNames()
+    );
+    Assert.assertEquals(
+        List.of(
+            List.of("wiki", Intervals.of("2013-01-08/P1D"), 10, 
1_000_000_000L, 1, "not compacted yet"),
+            List.of("wiki", Intervals.of("2013-01-04/P1D"), 10, 
1_000_000_000L, 1, "not compacted yet")
+        ),
+        pendingTable.getRows()
+    );
+
+    final Table skippedTable = 
compactionStates.get(CompactionStatus.State.SKIPPED);
+    Assert.assertEquals(
+        List.of("dataSource", "interval", "numSegments", "bytes", 
"reasonToSkip"),
+        skippedTable.getColumnNames()
+    );
+    Assert.assertEquals(
+        List.of(
+            List.of("wiki", Intervals.of("2013-01-02/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-03/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-07/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-05/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-06/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-01/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-09/P1D"), 10, 
1_000_000_000L, 1, "Rejected by search policy"),
+            List.of("wiki", Intervals.of("2013-01-10/P1D"), 10, 
1_000_000_000L, 1, "skip offset from latest[P1D]")
         ),
         skippedTable.getRows()
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to