This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 847968e530c KAFKA-19281: Add share enable flag to periodic jobs. 
(#19721)
847968e530c is described below

commit 847968e530c1876fca89c2a6827d6b90e1755904
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Thu May 15 18:35:06 2025 +0530

    KAFKA-19281: Add share enable flag to periodic jobs. (#19721)
    
    * We have a few periodic timer tasks in `ShareCoordinatorService` which
    run continuously.
    * With the recent introduction of share group enabled config at feature
    level, we would like these jobs to stop when the aforementioned feature
    is disabled.
    * In this PR, we have added the functionality to make that possible.
    * Additionally the service has been supplemented with addition of a
    static share group config supplier.
    * New test has been added.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal
     <apoorvmitta...@gmail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 .../coordinator/share/ShareCoordinatorService.java |  57 +++++-
 .../share/ShareCoordinatorServiceTest.java         | 228 +++++++++++++++++----
 3 files changed, 245 insertions(+), 41 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f8b52a728c6..3a7fd798801 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -660,6 +660,7 @@ class BrokerServer(
       .withWriter(writer)
       .withCoordinatorRuntimeMetrics(new 
ShareCoordinatorRuntimeMetrics(metrics))
       .withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
+      .withShareGroupEnabledConfigSupplier(() => 
config.shareGroupConfig.isShareGroupEnabled)
       .build()
   }
 
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 3c47bf86fa0..e26aac124ff 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -54,6 +54,7 @@ import 
org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.FutureUtils;
@@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
+import java.util.function.Supplier;
 
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
 
@@ -91,6 +93,8 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     private final Timer timer;
     private final PartitionWriter writer;
     private final Map<TopicPartition, Long> lastPrunedOffsets;
+    private final Supplier<Boolean> shareGroupConfigEnabledSupplier;
+    private volatile boolean shouldRunPeriodicJob;
 
     public static class Builder {
         private final int nodeId;
@@ -99,7 +103,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         private CoordinatorLoader<CoordinatorRecord> loader;
         private Time time;
         private Timer timer;
-
+        private Supplier<Boolean> shareGroupConfigEnabledSupplier;
         private ShareCoordinatorMetrics coordinatorMetrics;
         private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
 
@@ -138,6 +142,11 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
             return this;
         }
 
+        public Builder withShareGroupEnabledConfigSupplier(Supplier<Boolean> 
shareGroupConfigEnabledSupplier) {
+            this.shareGroupConfigEnabledSupplier = 
shareGroupConfigEnabledSupplier;
+            return this;
+        }
+
         public ShareCoordinatorService build() {
             if (config == null) {
                 throw new IllegalArgumentException("Config must be set.");
@@ -160,6 +169,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
             if (coordinatorRuntimeMetrics == null) {
                 throw new IllegalArgumentException("Coordinator runtime 
metrics must be set.");
             }
+            if (shareGroupConfigEnabledSupplier == null) {
+                throw new IllegalArgumentException("Share group enabled config 
enabled supplier must be set.");
+            }
 
             String logPrefix = String.format("ShareCoordinator id=%d", nodeId);
             LogContext logContext = new LogContext(String.format("[%s] ", 
logPrefix));
@@ -202,7 +214,8 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 coordinatorMetrics,
                 time,
                 timer,
-                writer
+                writer,
+                shareGroupConfigEnabledSupplier
             );
         }
     }
@@ -214,7 +227,8 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         ShareCoordinatorMetrics shareCoordinatorMetrics,
         Time time,
         Timer timer,
-        PartitionWriter writer
+        PartitionWriter writer,
+        Supplier<Boolean> shareGroupConfigEnabledSupplier
     ) {
         this.log = logContext.logger(ShareCoordinatorService.class);
         this.config = config;
@@ -224,6 +238,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         this.timer = timer;
         this.writer = writer;
         this.lastPrunedOffsets = new ConcurrentHashMap<>();
+        this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier;
     }
 
     @Override
@@ -265,7 +280,6 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
-        setupPeriodicJobs();
         log.info("Startup complete.");
     }
 
@@ -274,11 +288,15 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         setupSnapshotColdPartitions();
     }
 
-    private void setupRecordPruning() {
+    // Visibility for tests
+    void setupRecordPruning() {
         log.debug("Scheduling share-group state topic prune job.");
         timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
             @Override
             public void run() {
+                if (!shouldRunPeriodicJob) {
+                    return;
+                }
                 List<CompletableFuture<Void>> futures = new ArrayList<>();
                 runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
 
@@ -349,11 +367,15 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         return fut;
     }
 
-    private void setupSnapshotColdPartitions() {
+    // Visibility for tests
+    void setupSnapshotColdPartitions() {
         log.debug("Scheduling cold share-partition snapshotting.");
         timer.add(new 
TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
             @Override
             public void run() {
+                if (!shouldRunPeriodicJob) {
+                    return;
+                }
                 List<CompletableFuture<Void>> futures = 
runtime.scheduleWriteAllOperation(
                     "snapshot-cold-partitions",
                     Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
@@ -1075,6 +1097,18 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
         throwIfNotActive();
         this.runtime.onNewMetadataImage(newImage, delta);
+        boolean enabled = isShareGroupsEnabled(newImage);
+        // enabled    shouldRunJob         result (XOR)
+        // 0            0               no op on flag, do not call jobs
+        // 0            1               disable flag, do not call jobs         
             => action
+        // 1            0               enable flag, call jobs as they are not 
recursing    => action
+        // 1            1               no op on flag, do not call jobs
+        if (enabled ^ shouldRunPeriodicJob) {
+            shouldRunPeriodicJob = enabled;
+            if (enabled) {
+                setupPeriodicJobs();
+            }
+        }
     }
 
     TopicPartition topicPartitionFor(SharePartitionKey key) {
@@ -1090,4 +1124,15 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
             throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
         }
     }
+
+    private boolean isShareGroupsEnabled(MetadataImage image) {
+        return shareGroupConfigEnabledSupplier.get() || 
ShareVersion.fromFeatureLevel(
+            
image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME, 
(short) 0)
+        ).supportsShareGroups();
+    }
+
+    // Visibility for tests
+    boolean shouldRunPeriodicJob() {
+        return shouldRunPeriodicJob;
+    }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 952464efd60..542d9cd8d0d 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -45,6 +45,9 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.MockTime;
@@ -52,6 +55,7 @@ import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.server.util.timer.Timer;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.time.Duration;
 import java.util.HashSet;
@@ -68,17 +72,21 @@ import java.util.concurrent.TimeoutException;
 import static 
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("ClassFanOutComplexity")
 class ShareCoordinatorServiceTest {
 
     @SuppressWarnings("unchecked")
@@ -99,7 +107,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             new MockTimer(),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -122,7 +131,8 @@ class ShareCoordinatorServiceTest {
             coordinatorMetrics,
             time,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -232,7 +242,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -334,7 +345,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -418,7 +430,8 @@ class ShareCoordinatorServiceTest {
             coordinatorMetrics,
             time,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -500,7 +513,8 @@ class ShareCoordinatorServiceTest {
             coordinatorMetrics,
             time,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -582,7 +596,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -629,7 +644,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -676,7 +692,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -723,7 +740,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -770,7 +788,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -817,7 +836,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         String groupId = "group1";
@@ -896,7 +916,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         String groupId = "group1";
@@ -959,7 +980,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         String groupId = "group1";
@@ -1022,7 +1044,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         String groupId = "group1";
@@ -1083,7 +1106,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         String groupId = "group1";
@@ -1143,7 +1167,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -1193,7 +1218,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -1236,7 +1262,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -1279,7 +1306,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -1321,7 +1349,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -1362,7 +1391,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         service.startup(() -> 1);
@@ -1391,7 +1421,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class)
+            mock(PartitionWriter.class),
+            () -> true
         );
 
         String groupId = "group1";
@@ -1449,10 +1480,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1542,10 +1575,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 2);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1602,10 +1637,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1653,10 +1690,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1702,10 +1741,13 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
+
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1763,10 +1805,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1837,10 +1881,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1889,7 +1935,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         when(runtime.scheduleWriteAllOperation(
@@ -1899,6 +1946,7 @@ class ShareCoordinatorServiceTest {
         )).thenReturn(List.of(CompletableFuture.completedFuture(null)));
 
         service.startup(() -> 1);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("snapshot-cold-partitions"),
@@ -1951,10 +1999,12 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 2);
+        service.onNewMetadataImage(mock(MetadataImage.class), 
mock(MetadataDelta.class));
         verify(runtime, times(0))
             .scheduleWriteAllOperation(
                 eq("snapshot-cold-partitions"),
@@ -1980,6 +2030,111 @@ class ShareCoordinatorServiceTest {
         service.shutdown();
     }
 
+    @Test
+    public void testPeriodicJobsDoNotRunWhenShareGroupsDisabled() throws 
InterruptedException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        PartitionWriter writer = mock(PartitionWriter.class);
+        MockTime time = new MockTime();
+        MockTimer timer = spy(new MockTimer(time));
+
+        Metrics metrics = new Metrics();
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(metrics),
+            time,
+            timer,
+            writer,
+            () -> false // So that the feature config is used.
+        ));
+
+        // Prune job.
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        )).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        // Snapshot job.
+        when(runtime.scheduleWriteAllOperation(
+            eq("snapshot-cold-partitions"),
+            any(),
+            any()
+        )).thenReturn(List.of());
+
+        assertFalse(service.shouldRunPeriodicJob());
+
+        service.startup(() -> 1);
+
+        MetadataImage mockedImage = mock(MetadataImage.class, 
RETURNS_DEEP_STUBS);
+
+        // Feature disabled on start.
+        
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 0);
+        service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class));   
// Jobs will not execute as feature is OFF in image.
+
+        verify(timer, times(0)).add(any()); // Timer task not added.
+        verify(runtime, times(0)).scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        );
+        verify(runtime, times(0)).scheduleWriteAllOperation(
+            eq("snapshot-cold-partitions"),
+            any(),
+            any()
+        );
+        assertFalse(service.shouldRunPeriodicJob());
+
+        // Enable feature.
+        Mockito.reset(mockedImage);
+        
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 1);
+        service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class));   
// Jobs will execute as feature is ON in image.
+
+        verify(timer, times(2)).add(any()); // Timer task added twice (prune, 
snapshot).
+        timer.advanceClock(30001L);
+        verify(runtime, times(1)).scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        );
+        verify(runtime, times(1)).scheduleWriteAllOperation(
+            eq("snapshot-cold-partitions"),
+            any(),
+            any()
+        );
+        assertTrue(service.shouldRunPeriodicJob());
+
+        // Disable feature
+        Mockito.reset(mockedImage);
+        
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 0);
+        service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class));   
// Jobs will not execute as feature is on in image.
+        timer.advanceClock(30001L);
+
+        verify(timer, times(4)).add(any()); // Tasks added but will return 
immediately.
+        verify(runtime, times(1)).scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        );
+        verify(runtime, times(1)).scheduleWriteAllOperation(
+            eq("snapshot-cold-partitions"),
+            any(),
+            any()
+        );
+        assertFalse(service.shouldRunPeriodicJob());
+
+        timer.advanceClock(30001L);
+        verify(timer, times(4)).add(any()); // No new additions.
+
+        service.shutdown();
+    }
+
     @Test
     public void testShareStateTopicConfigs() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -1995,7 +2150,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         List<String> propNames = List.of(
@@ -2025,7 +2181,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 3);
@@ -2065,7 +2222,8 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer
+            writer,
+            () -> true
         ));
 
         service.startup(() -> 3);

Reply via email to