This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fa9914ba058 Fix bug in coordinator-based compaction duty and simulator
(#18812)
fa9914ba058 is described below
commit fa9914ba0585ba20c636f599355676b113142f3e
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Dec 5 12:29:02 2025 +0530
Fix bug in coordinator-based compaction duty and simulator (#18812)
Changes:
- Fix bug in `CompactionRunSimulator` so that it honors the compaction
policy being verified
- Fix bug in `CompactSegments` so that the duty does not launch tasks for
an interval that has
already been skipped by the policy
- This bug does not occur in Overlord-based compaction supervisors
- It does not affect any real scenarios currently since the only existing
production policy
`newestSegmentFirst` never skips intervals anyway.
---
.../compact/CompactionConfigBasedJobTemplate.java | 2 +
.../server/compaction/CompactionRunSimulator.java | 11 +++-
.../server/coordinator/duty/CompactSegments.java | 2 +
.../compaction/CompactionRunSimulatorTest.java | 77 +++++++++++++++++++++-
4 files changed, 88 insertions(+), 4 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
index a5db148372d..6b984a4b6c0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
@@ -118,6 +118,8 @@ public class CompactionConfigBasedJobTemplate implements
CompactionJobTemplate
config,
timeline,
Intervals.complementOf(searchInterval),
+ // This policy is used only while creating jobs
+ // The actual order of jobs is determined by the policy used in
CompactionJobQueue
new NewestSegmentFirstPolicy(null)
);
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
index 781019e12f7..58fe28e6ea9 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
@@ -132,13 +132,18 @@ public class CompactionRunSimulator
};
// Unlimited task slots to ensure that simulator does not skip any interval
- final DruidCompactionConfig configWithUnlimitedTaskSlots =
compactionConfig.withClusterConfig(
- new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null)
+ final ClusterCompactionConfig clusterConfig =
compactionConfig.clusterConfig();
+ final ClusterCompactionConfig configWithUnlimitedTaskSlots = new
ClusterCompactionConfig(
+ 1.0,
+ Integer.MAX_VALUE,
+ clusterConfig.getCompactionPolicy(),
+ clusterConfig.isUseSupervisors(),
+ clusterConfig.getEngine()
);
final CoordinatorRunStats stats = new CoordinatorRunStats();
new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
- configWithUnlimitedTaskSlots,
+ compactionConfig.withClusterConfig(configWithUnlimitedTaskSlots),
dataSourcesSnapshot,
defaultEngine,
stats
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 1dca23090c2..9947e521f65 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -245,8 +245,10 @@ public class CompactSegments implements
CoordinatorCustomDuty
if (compactionStatus.isComplete()) {
snapshotBuilder.addToComplete(candidatesWithStatus);
+ continue;
} else if (compactionStatus.isSkipped()) {
snapshotBuilder.addToSkipped(candidatesWithStatus);
+ continue;
} else {
// As these segments will be compacted, we will aggregate the
statistic to the Compacted statistics
snapshotBuilder.addToComplete(entry);
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
index bab2e351df1..34b61144e2b 100644
---
a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java
@@ -31,6 +31,8 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -111,7 +113,80 @@ public class CompactionRunSimulatorTest
);
Assert.assertEquals(
Collections.singletonList(
- Arrays.asList("wiki", Intervals.of("2013-01-10/P1D"), 10,
1_000_000_000L, 1, "skip offset from latest[P1D]")
+ List.of("wiki", Intervals.of("2013-01-10/P1D"), 10,
1_000_000_000L, 1, "skip offset from latest[P1D]")
+ ),
+ skippedTable.getRows()
+ );
+ }
+
+ @Test
+ public void testSimulate_withFixedIntervalOrderPolicy()
+ {
+ final TestSegmentsMetadataManager segmentsMetadataManager = new
TestSegmentsMetadataManager();
+
+ // Add some segments to the timeline
+ final String dataSource = TestDataSource.WIKI;
+ final List<DataSegment> wikiSegments
+ = CreateDataSegments.ofDatasource(dataSource)
+ .forIntervals(10, Granularities.DAY)
+ .withNumPartitions(10)
+ .startingAt("2013-01-01")
+ .eachOfSizeInMb(100);
+ wikiSegments.forEach(segmentsMetadataManager::addSegment);
+
+ final FixedIntervalOrderPolicy policy = new FixedIntervalOrderPolicy(
+ List.of(
+ new FixedIntervalOrderPolicy.Candidate(dataSource,
Intervals.of("2013-01-08/P1D")),
+ new FixedIntervalOrderPolicy.Candidate(dataSource,
Intervals.of("2013-01-04/P1D"))
+ )
+ );
+ final CompactionSimulateResult simulateResult =
simulator.simulateRunWithConfig(
+ DruidCompactionConfig
+ .empty()
+ .withClusterConfig(new ClusterCompactionConfig(null, null, policy,
null, null))
+ .withDatasourceConfig(
+
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(dataSource).build()
+ ),
+ segmentsMetadataManager.getRecentDataSourcesSnapshot(),
+ CompactionEngine.NATIVE
+ );
+
+ Assert.assertNotNull(simulateResult);
+
+ final Map<CompactionStatus.State, Table> compactionStates =
simulateResult.getCompactionStates();
+ Assert.assertNotNull(compactionStates);
+
+ Assert.assertNull(compactionStates.get(CompactionStatus.State.COMPLETE));
+ Assert.assertNull(compactionStates.get(CompactionStatus.State.RUNNING));
+
+ final Table pendingTable =
compactionStates.get(CompactionStatus.State.PENDING);
+ Assert.assertEquals(
+ List.of("dataSource", "interval", "numSegments", "bytes",
"maxTaskSlots", "reasonToCompact"),
+ pendingTable.getColumnNames()
+ );
+ Assert.assertEquals(
+ List.of(
+ List.of("wiki", Intervals.of("2013-01-08/P1D"), 10,
1_000_000_000L, 1, "not compacted yet"),
+ List.of("wiki", Intervals.of("2013-01-04/P1D"), 10,
1_000_000_000L, 1, "not compacted yet")
+ ),
+ pendingTable.getRows()
+ );
+
+ final Table skippedTable =
compactionStates.get(CompactionStatus.State.SKIPPED);
+ Assert.assertEquals(
+ List.of("dataSource", "interval", "numSegments", "bytes",
"reasonToSkip"),
+ skippedTable.getColumnNames()
+ );
+ Assert.assertEquals(
+ List.of(
+ List.of("wiki", Intervals.of("2013-01-02/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-03/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-07/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-05/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-06/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-01/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-09/P1D"), 10,
1_000_000_000L, 1, "Rejected by search policy"),
+ List.of("wiki", Intervals.of("2013-01-10/P1D"), 10,
1_000_000_000L, 1, "skip offset from latest[P1D]")
),
skippedTable.getRows()
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]