This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 847968e530c KAFKA-19281: Add share enable flag to periodic jobs. (#19721) 847968e530c is described below commit 847968e530c1876fca89c2a6827d6b90e1755904 Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Thu May 15 18:35:06 2025 +0530 KAFKA-19281: Add share enable flag to periodic jobs. (#19721) * We have a few periodic timer tasks in `ShareCoordinatorService` which run continuously. * With the recent introduction of share group enabled config at feature level, we would like these jobs to stop when the aforementioned feature is disabled. * In this PR, we have added the functionality to make that possible. * Additionally the service has been supplemented with addition of a static share group config supplier. * New test has been added. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal <apoorvmitta...@gmail.com> --- .../src/main/scala/kafka/server/BrokerServer.scala | 1 + .../coordinator/share/ShareCoordinatorService.java | 57 +++++- .../share/ShareCoordinatorServiceTest.java | 228 +++++++++++++++++---- 3 files changed, 245 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index f8b52a728c6..3a7fd798801 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -660,6 +660,7 @@ class BrokerServer( .withWriter(writer) .withCoordinatorRuntimeMetrics(new ShareCoordinatorRuntimeMetrics(metrics)) .withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics)) + .withShareGroupEnabledConfigSupplier(() => config.shareGroupConfig.isShareGroupEnabled) .build() } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 3c47bf86fa0..e26aac124ff 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -54,6 +54,7 @@ import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ShareVersion; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.util.FutureUtils; @@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntSupplier; +import java.util.function.Supplier; import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException; @@ -91,6 +93,8 @@ public class ShareCoordinatorService implements ShareCoordinator { private final Timer timer; private final PartitionWriter writer; private final Map<TopicPartition, Long> lastPrunedOffsets; + private final Supplier<Boolean> shareGroupConfigEnabledSupplier; + private volatile boolean shouldRunPeriodicJob; public static class Builder { private final int nodeId; @@ -99,7 +103,7 @@ public class ShareCoordinatorService implements ShareCoordinator { private CoordinatorLoader<CoordinatorRecord> loader; private Time time; private Timer timer; - + private Supplier<Boolean> shareGroupConfigEnabledSupplier; private ShareCoordinatorMetrics coordinatorMetrics; private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics; @@ -138,6 +142,11 @@ public class ShareCoordinatorService implements ShareCoordinator { return this; } + public Builder withShareGroupEnabledConfigSupplier(Supplier<Boolean> shareGroupConfigEnabledSupplier) { + this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier; + return this; + } + public ShareCoordinatorService build() { if (config == null) { throw new IllegalArgumentException("Config must be set."); @@ -160,6 +169,9 @@ public class ShareCoordinatorService implements ShareCoordinator { if (coordinatorRuntimeMetrics == null) { throw new IllegalArgumentException("Coordinator runtime metrics must be set."); } + if (shareGroupConfigEnabledSupplier == null) { + throw new IllegalArgumentException("Share group enabled config enabled supplier must be set."); + } String logPrefix = String.format("ShareCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); @@ -202,7 +214,8 @@ public class ShareCoordinatorService implements ShareCoordinator { coordinatorMetrics, time, timer, - writer + writer, + shareGroupConfigEnabledSupplier ); } } @@ -214,7 +227,8 @@ public class ShareCoordinatorService implements ShareCoordinator { ShareCoordinatorMetrics shareCoordinatorMetrics, Time time, Timer timer, - PartitionWriter writer + PartitionWriter writer, + Supplier<Boolean> shareGroupConfigEnabledSupplier ) { this.log = logContext.logger(ShareCoordinatorService.class); this.config = config; @@ -224,6 +238,7 @@ public class ShareCoordinatorService implements ShareCoordinator { this.timer = timer; this.writer = writer; this.lastPrunedOffsets = new ConcurrentHashMap<>(); + this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier; } @Override @@ -265,7 +280,6 @@ public class ShareCoordinatorService implements ShareCoordinator { log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); - setupPeriodicJobs(); log.info("Startup complete."); } @@ -274,11 +288,15 @@ public class ShareCoordinatorService implements ShareCoordinator { setupSnapshotColdPartitions(); } - private void setupRecordPruning() { + // Visibility for tests + void setupRecordPruning() { log.debug("Scheduling share-group state topic prune job."); timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { @Override public void run() { + if (!shouldRunPeriodicJob) { + return; + } List<CompletableFuture<Void>> futures = new ArrayList<>(); runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); @@ -349,11 +367,15 @@ public class ShareCoordinatorService implements ShareCoordinator { return fut; } - private void setupSnapshotColdPartitions() { + // Visibility for tests + void setupSnapshotColdPartitions() { log.debug("Scheduling cold share-partition snapshotting."); timer.add(new TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) { @Override public void run() { + if (!shouldRunPeriodicJob) { + return; + } List<CompletableFuture<Void>> futures = runtime.scheduleWriteAllOperation( "snapshot-cold-partitions", Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), @@ -1075,6 +1097,18 @@ public class ShareCoordinatorService implements ShareCoordinator { public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { throwIfNotActive(); this.runtime.onNewMetadataImage(newImage, delta); + boolean enabled = isShareGroupsEnabled(newImage); + // enabled shouldRunJob result (XOR) + // 0 0 no op on flag, do not call jobs + // 0 1 disable flag, do not call jobs => action + // 1 0 enable flag, call jobs as they are not recursing => action + // 1 1 no op on flag, do not call jobs + if (enabled ^ shouldRunPeriodicJob) { + shouldRunPeriodicJob = enabled; + if (enabled) { + setupPeriodicJobs(); + } + } } TopicPartition topicPartitionFor(SharePartitionKey key) { @@ -1090,4 +1124,15 @@ public class ShareCoordinatorService implements ShareCoordinator { throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); } } + + private boolean isShareGroupsEnabled(MetadataImage image) { + return shareGroupConfigEnabledSupplier.get() || ShareVersion.fromFeatureLevel( + image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME, (short) 0) + ).supportsShareGroups(); + } + + // Visibility for tests + boolean shouldRunPeriodicJob() { + return shouldRunPeriodicJob; + } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 952464efd60..542d9cd8d0d 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -45,6 +45,9 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ShareVersion; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.MockTime; @@ -52,6 +55,7 @@ import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.server.util.timer.Timer; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.time.Duration; import java.util.HashSet; @@ -68,17 +72,21 @@ import java.util.concurrent.TimeoutException; import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyShort; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("ClassFanOutComplexity") class ShareCoordinatorServiceTest { @SuppressWarnings("unchecked") @@ -99,7 +107,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, new MockTimer(), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -122,7 +131,8 @@ class ShareCoordinatorServiceTest { coordinatorMetrics, time, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -232,7 +242,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -334,7 +345,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -418,7 +430,8 @@ class ShareCoordinatorServiceTest { coordinatorMetrics, time, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -500,7 +513,8 @@ class ShareCoordinatorServiceTest { coordinatorMetrics, time, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -582,7 +596,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -629,7 +644,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -676,7 +692,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -723,7 +740,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -770,7 +788,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -817,7 +836,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); String groupId = "group1"; @@ -896,7 +916,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); String groupId = "group1"; @@ -959,7 +980,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); String groupId = "group1"; @@ -1022,7 +1044,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); String groupId = "group1"; @@ -1083,7 +1106,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); String groupId = "group1"; @@ -1143,7 +1167,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -1193,7 +1218,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -1236,7 +1262,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -1279,7 +1306,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -1321,7 +1349,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -1362,7 +1391,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); service.startup(() -> 1); @@ -1391,7 +1421,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(), Time.SYSTEM, mock(Timer.class), - mock(PartitionWriter.class) + mock(PartitionWriter.class), + () -> true ); String groupId = "group1"; @@ -1449,10 +1480,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1542,10 +1575,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 2); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1602,10 +1637,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1653,10 +1690,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1702,10 +1741,13 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); + verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1763,10 +1805,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1837,10 +1881,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("write-state-record-prune"), @@ -1889,7 +1935,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); when(runtime.scheduleWriteAllOperation( @@ -1899,6 +1946,7 @@ class ShareCoordinatorServiceTest { )).thenReturn(List.of(CompletableFuture.completedFuture(null))); service.startup(() -> 1); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteOperation( eq("snapshot-cold-partitions"), @@ -1951,10 +1999,12 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 2); + service.onNewMetadataImage(mock(MetadataImage.class), mock(MetadataDelta.class)); verify(runtime, times(0)) .scheduleWriteAllOperation( eq("snapshot-cold-partitions"), @@ -1980,6 +2030,111 @@ class ShareCoordinatorServiceTest { service.shutdown(); } + @Test + public void testPeriodicJobsDoNotRunWhenShareGroupsDisabled() throws InterruptedException { + CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + PartitionWriter writer = mock(PartitionWriter.class); + MockTime time = new MockTime(); + MockTimer timer = spy(new MockTimer(time)); + + Metrics metrics = new Metrics(); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(metrics), + time, + timer, + writer, + () -> false // So that the feature config is used. + )); + + // Prune job. + when(runtime.scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + )).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + // Snapshot job. + when(runtime.scheduleWriteAllOperation( + eq("snapshot-cold-partitions"), + any(), + any() + )).thenReturn(List.of()); + + assertFalse(service.shouldRunPeriodicJob()); + + service.startup(() -> 1); + + MetadataImage mockedImage = mock(MetadataImage.class, RETURNS_DEEP_STUBS); + + // Feature disabled on start. + when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME), anyShort())).thenReturn((short) 0); + service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class)); // Jobs will not execute as feature is OFF in image. + + verify(timer, times(0)).add(any()); // Timer task not added. + verify(runtime, times(0)).scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + ); + verify(runtime, times(0)).scheduleWriteAllOperation( + eq("snapshot-cold-partitions"), + any(), + any() + ); + assertFalse(service.shouldRunPeriodicJob()); + + // Enable feature. + Mockito.reset(mockedImage); + when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME), anyShort())).thenReturn((short) 1); + service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class)); // Jobs will execute as feature is ON in image. + + verify(timer, times(2)).add(any()); // Timer task added twice (prune, snapshot). + timer.advanceClock(30001L); + verify(runtime, times(1)).scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + ); + verify(runtime, times(1)).scheduleWriteAllOperation( + eq("snapshot-cold-partitions"), + any(), + any() + ); + assertTrue(service.shouldRunPeriodicJob()); + + // Disable feature + Mockito.reset(mockedImage); + when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME), anyShort())).thenReturn((short) 0); + service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class)); // Jobs will not execute as feature is on in image. + timer.advanceClock(30001L); + + verify(timer, times(4)).add(any()); // Tasks added but will return immediately. + verify(runtime, times(1)).scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any(), + any() + ); + verify(runtime, times(1)).scheduleWriteAllOperation( + eq("snapshot-cold-partitions"), + any(), + any() + ); + assertFalse(service.shouldRunPeriodicJob()); + + timer.advanceClock(30001L); + verify(timer, times(4)).add(any()); // No new additions. + + service.shutdown(); + } + @Test public void testShareStateTopicConfigs() { CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); @@ -1995,7 +2150,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); List<String> propNames = List.of( @@ -2025,7 +2181,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 3); @@ -2065,7 +2222,8 @@ class ShareCoordinatorServiceTest { new ShareCoordinatorMetrics(metrics), time, timer, - writer + writer, + () -> true )); service.startup(() -> 3);