This is an automated email from the ASF dual-hosted git repository. Fly-Style pushed a commit to branch feat/stop-gracefully in repository https://gitbox.apache.org/repos/asf/druid.git
commit 2f26286523e3c37382c84fa237039ab7d8c09a17 Author: Sasha Syrotenko <[email protected]> AuthorDate: Mon Jun 8 16:23:22 2026 +0300 Introduce stopGracefullyOnNewSpec --- .../overlord/supervisor/SupervisorManager.java | 26 ++++++---- .../supervisor/SeekableStreamSupervisor.java | 6 +++ .../overlord/supervisor/SupervisorManagerTest.java | 58 ++++++++++++++++++++++ .../SeekableStreamSupervisorStateTest.java | 14 ++++++ .../indexing/overlord/supervisor/Supervisor.java | 9 ++++ 5 files changed, 104 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index fa7d96634ae..d64d3f6ca4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -377,7 +377,7 @@ public class SupervisorManager implements SupervisorStatsProvider * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to * process the skipped range from the previously checkpointed offsets up to the latest offsets. * - * @param id supervisor ID + * @param id supervisor ID * @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec * @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"} * @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor}, @@ -424,10 +424,20 @@ public class SupervisorManager implements SupervisorStatsProvider String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill"); try { - Map<String, Object> normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class); - Map<String, Object> normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class); + Map<String, Object> normalizedStartOffsets = jsonMapper.readValue( + jsonMapper.writeValueAsString(startOffsets), + Map.class + ); + Map<String, Object> normalizedEndOffsets = jsonMapper.readValue( + jsonMapper.writeValueAsString(endOffsets), + Map.class + ); BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets); - SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig, backfillTaskCount); + SupervisorSpec backfillSpec = streamSpec.createBackfillSpec( + backfillSupervisorId, + boundedStreamConfig, + backfillTaskCount + ); createOrUpdateAndStartSupervisor(backfillSpec); } catch (JsonProcessingException e) { @@ -615,12 +625,10 @@ public class SupervisorManager implements SupervisorStatsProvider } if (writeTombstone) { - metadataSupervisorManager.insert( - id, - new NoopSupervisorSpec(null, pair.rhs.getDataSources()) - ); // where NoopSupervisorSpec is a tombstone + // NoopSupervisorSpec is a tombstone + metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, pair.rhs.getDataSources())); } - pair.lhs.stop(true); + pair.lhs.stop(pair.lhs.stopGracefullyOnNewSpec()); supervisors.remove(id); SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 74329c68e1d..ae38f788e10 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1285,6 +1285,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy } } + @Override + public boolean stopGracefullyOnNewSpec() + { + return true; + } + @Override public void stop(boolean stopGracefully) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 199e004b424..01d88868bee 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -135,6 +135,7 @@ public class SupervisorManagerTest extends EasyMockSupport resetAll(); supervisor2.start(); EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); replayAll(); @@ -145,6 +146,7 @@ public class SupervisorManagerTest extends EasyMockSupport resetAll(); metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class)); + EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(true); supervisor2.stop(true); replayAll(); @@ -580,6 +582,7 @@ public class SupervisorManagerTest extends EasyMockSupport metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor1.start(); EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor2.start(); @@ -729,6 +732,7 @@ public class SupervisorManagerTest extends EasyMockSupport metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor2.start(); EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); replayAll(); @@ -742,6 +746,7 @@ public class SupervisorManagerTest extends EasyMockSupport // in TestSupervisorSpec implementation of createRunningSpec resetAll(); metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); + EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(true); supervisor2.stop(true); supervisor1.start(); EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); @@ -756,6 +761,7 @@ public class SupervisorManagerTest extends EasyMockSupport // mock stop of suspended then resumed supervisor resetAll(); metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class)); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true); supervisor1.stop(true); replayAll(); @@ -778,6 +784,58 @@ public class SupervisorManagerTest extends EasyMockSupport Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testStopGracefullyOnNewSpecFalseUsesNonGracefulStop() + { + SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1); + SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2); + Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of( + "id3", new TestSupervisorSpec("id3", supervisor3) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + metadataSupervisorManager.insert("id1", spec); + supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + + manager.start(); + manager.createOrUpdateAndStartSupervisor(spec); + verifyAll(); + + // spec update: supervisor1 opts out of graceful stop-on-new-spec, so it is stopped with stop(false), leaving its + // managed tasks running for the replacement supervisor to reconcile. + resetAll(); + supervisor2.start(); + EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(false); + supervisor1.stop(false); + replayAll(); + + manager.createOrUpdateAndStartSupervisor(spec2); + verifyAll(); + + // terminate also honors the supervisor's preference; a supervisor that opts out of graceful stop is stopped with + // stop(false). + resetAll(); + metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.anyObject(NoopSupervisorSpec.class)); + EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(false); + supervisor2.stop(false); + replayAll(); + + Assert.assertTrue(manager.stopAndRemoveSupervisor("id1")); + verifyAll(); + } + + @Test + public void testStopGracefullyOnNewSpecDefaultsToFalse() + { + final Supervisor supervisor = new NoopSupervisorSpec(null, ImmutableList.of("ds")).createSupervisor(); + Assert.assertFalse(supervisor.stopGracefullyOnNewSpec()); + } + @Test public void testGetActiveSupervisorIdForDatasourceWithAppendLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 9e45920ad71..a1835f17706 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -247,6 +247,20 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport verifyAll(); } + @Test + public void testStopGracefullyOnNewSpecReturnsTrue() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + replayAll(); + + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // SeekableStreamSupervisor retains the historical graceful stop-and-roll behavior on a spec update. + Assert.assertTrue(supervisor.stopGracefullyOnNewSpec()); + + verifyAll(); + } + @Test public void testRunningStreamGetSequenceNumberReturnsNull() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index ce6c87e5e08..084eca20804 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -47,6 +47,14 @@ public interface Supervisor */ void stop(boolean stopGracefully); + /** + * Indicates whether this supervisor should be stopped gracefully when its spec is updated/suspended/resumed + */ + default boolean stopGracefullyOnNewSpec() + { + return false; + } + /** * Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete. */ @@ -94,6 +102,7 @@ public interface Supervisor /** * Resets any stored metadata by the supervisor. + * * @param dataSourceMetadata optional dataSource metadata. */ void reset(@Nullable DataSourceMetadata dataSourceMetadata); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
