This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f90af70ee6 MINOR: Remove onPartitionsDeleted from GroupCoordinator 
interface (#21263)
2f90af70ee6 is described below

commit 2f90af70ee61dab3dae43a82d29f9f8505056bc9
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 8 18:07:09 2026 +0100

    MINOR: Remove onPartitionsDeleted from GroupCoordinator interface (#21263)
    
    This patch removes the `onPartitionsDeleted` method from the
    `GroupCoordinator` interface by moving its functionality into
    `onMetadataUpdate`. The `MetadataDelta` already contains the deleted
    topic information via `delta.topicsDelta().deletedTopicIds()`, making
    the separate method redundant.
    
    Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah
     <[email protected]>, Lianet Magrans <[email protected]>
---
 .../server/metadata/BrokerMetadataPublisher.scala  |  18 --
 .../unit/kafka/server/OffsetFetchRequestTest.scala |   2 +-
 .../kafka/coordinator/group/GroupCoordinator.java  |  12 --
 .../coordinator/group/GroupCoordinatorService.java | 126 ++++++------
 .../group/GroupCoordinatorServiceTest.java         | 212 +++++++++------------
 5 files changed, 158 insertions(+), 212 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index afc982aae00..35c44b9524d 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -23,7 +23,6 @@ import kafka.log.LogManager
 import kafka.server.share.SharePartitionManager
 import kafka.server.{KafkaConfig, ReplicaManager}
 import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.coordinator.group.GroupCoordinator
@@ -40,7 +39,6 @@ import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
 
 import java.util.concurrent.CompletableFuture
-import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 
@@ -186,22 +184,6 @@ class BrokerMetadataPublisher(
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating share " +
             s"coordinator with local changes in $deltaName", t)
         }
-        try {
-          // Notify the group coordinator about deleted topics.
-          val deletedTopicPartitions = new 
mutable.ArrayBuffer[TopicPartition]()
-          topicsDelta.deletedTopicIds().forEach { id =>
-            val topicImage = topicsDelta.image().getTopic(id)
-            topicImage.partitions().keySet().forEach {
-              id => deletedTopicPartitions += new 
TopicPartition(topicImage.name(), id)
-            }
-          }
-          if (deletedTopicPartitions.nonEmpty) {
-            
groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, 
RequestLocal.noCaching.bufferSupplier)
-          }
-        } catch {
-          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
-            s"coordinator with deleted partitions in $deltaName", t)
-        }
         try {
           // Notify the share coordinator about deleted topics.
           val deletedTopicIds = topicsDelta.deletedTopicIds()
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index f838bb42d9e..ff228f245cb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -568,7 +568,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
   @ClusterTest
   def testFetchOffsetWithRecreatedTopic(): Unit = {
     // There are two ways to ensure that committed of recreated topics are not 
returned.
-    // 1) When a topic is deleted, GroupCoordinatorService#onPartitionsDeleted 
is called to
+    // 1) When a topic is deleted, GroupCoordinatorService#onMetadataUpdate is 
called to
     //    delete all its committed offsets.
     // 2) Since version 10 of the OffsetCommit API, the topic id is stored 
alongside the
     //    committed offset. When it is queried, it is only returned iff the 
topic id of
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 753d1736f7c..9eb6d1cfd31 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -64,7 +64,6 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.function.IntSupplier;
 
 /**
@@ -410,17 +409,6 @@ public interface GroupCoordinator {
      */
     int partitionFor(String groupId);
 
-    /**
-     * Remove the provided deleted partitions offsets.
-     *
-     * @param topicPartitions   The deleted partitions.
-     * @param bufferSupplier    The buffer supplier tight to the request 
thread.
-     */
-    void onPartitionsDeleted(
-        List<TopicPartition> topicPartitions,
-        BufferSupplier bufferSupplier
-    ) throws ExecutionException, InterruptedException;
-
     /**
      * Group coordinator is now the leader for the given partition at the
      * given leader epoch. It should load cached state from the partition
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 701e58f96c8..f8dc5068dba 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -100,6 +100,7 @@ import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicsDelta;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.record.BrokerCompressionType;
@@ -136,7 +137,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
@@ -2217,63 +2217,6 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         );
     }
 
-    /**
-     * See {@link GroupCoordinator#onPartitionsDeleted(List, BufferSupplier)}.
-     */
-    @Override
-    public void onPartitionsDeleted(
-        List<TopicPartition> topicPartitions,
-        BufferSupplier bufferSupplier
-    ) throws ExecutionException, InterruptedException {
-        throwIfNotActive();
-
-        var futures = new ArrayList<CompletableFuture<Void>>();
-
-        // Handle the partition deletion for committed offsets.
-        futures.addAll(
-            FutureUtils.mapExceptionally(
-                runtime.scheduleWriteAllOperation(
-                    "on-partition-deleted",
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> 
coordinator.onPartitionsDeleted(topicPartitions)
-                ),
-                exception -> {
-                    log.error("Could not delete offsets for deleted partitions 
{} due to: {}.",
-                        topicPartitions, exception.getMessage(), exception
-                    );
-                    return null;
-                }
-            )
-        );
-
-        // Handle the topic deletion for share state.
-        if (metadataImage != null) {
-            var topicIds = topicPartitions.stream()
-                .filter(tp -> 
metadataImage.topicMetadata(tp.topic()).isPresent())
-                .map(tp -> metadataImage.topicMetadata(tp.topic()).get().id())
-                .collect(Collectors.toSet());
-
-            if (!topicIds.isEmpty()) {
-                futures.addAll(
-                    FutureUtils.mapExceptionally(
-                        runtime.scheduleWriteAllOperation(
-                            "maybe-cleanup-share-group-state",
-                            Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                            coordinator -> 
coordinator.maybeCleanupShareGroupState(topicIds)
-                        ),
-                        exception -> {
-                            log.error("Unable to cleanup state for the deleted 
topics {}", topicIds, exception);
-                            return null;
-                        }
-                    )
-                );
-            }
-        }
-
-        // Wait on the results.
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
-    }
-
     /**
      * See {@link GroupCoordinator#onElection(int, int)}.
      */
@@ -2315,10 +2258,77 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         throwIfNotActive();
         Objects.requireNonNull(delta, "delta must be provided");
         Objects.requireNonNull(newImage, "newImage must be provided");
+
+        // Update the metadata image and propagate to runtime.
         var wrappedImage = new KRaftCoordinatorMetadataImage(newImage);
         var wrappedDelta = new KRaftCoordinatorMetadataDelta(delta);
         metadataImage = wrappedImage;
         runtime.onMetadataUpdate(wrappedDelta, wrappedImage);
+
+        // Handle partition deletions from the delta.
+        if (delta.topicsDelta() != null && 
!delta.topicsDelta().deletedTopicIds().isEmpty()) {
+            handlePartitionsDeletion(delta.topicsDelta());
+        }
+    }
+
+    /**
+     * Handles the deletion of topic partitions by scheduling write operations
+     * to delete committed offsets and clean up share group state.
+     *
+     * @param topicsDelta The topics delta containing deleted topic IDs.
+     */
+    private void handlePartitionsDeletion(TopicsDelta topicsDelta) {
+        var topicPartitions = new ArrayList<TopicPartition>();
+        var topicIds = topicsDelta.deletedTopicIds();
+
+        topicIds.forEach(topicId -> {
+            var topicImage = topicsDelta.image().getTopic(topicId);
+            if (topicImage != null) {
+                topicImage.partitions().keySet().forEach(partitionId ->
+                    topicPartitions.add(new TopicPartition(topicImage.name(), 
partitionId))
+                );
+            }
+        });
+
+        var futures = new ArrayList<CompletableFuture<Void>>();
+
+        if (!topicPartitions.isEmpty()) {
+            // Schedule offset deletion.
+            futures.addAll(
+                FutureUtils.mapExceptionally(
+                    runtime.scheduleWriteAllOperation(
+                        "on-partition-deleted",
+                        Duration.ofMillis(config.offsetCommitTimeoutMs()),
+                        coordinator -> 
coordinator.onPartitionsDeleted(topicPartitions)
+                    ),
+                    exception -> {
+                        log.error("Could not delete offsets for deleted 
partitions {} due to: {}.",
+                            topicPartitions, exception.getMessage(), 
exception);
+                        return null;
+                    }
+                )
+            );
+        }
+
+        if (!topicIds.isEmpty()) {
+            // Schedule share group state cleanup.
+            futures.addAll(
+                FutureUtils.mapExceptionally(
+                    runtime.scheduleWriteAllOperation(
+                        "maybe-cleanup-share-group-state",
+                        Duration.ofMillis(config.offsetCommitTimeoutMs()),
+                        coordinator -> 
coordinator.maybeCleanupShareGroupState(topicIds)
+                    ),
+                    exception -> {
+                        log.error("Unable to cleanup state for the deleted 
topics {}", topicIds, exception);
+                        return null;
+                    }
+                )
+            );
+        }
+
+        // Wait for all operations to complete.
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[0])).join();
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index df911666295..30fdc37e2b1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -76,6 +76,7 @@ import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -95,6 +96,7 @@ import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.server.common.TransactionVersion;
 import org.apache.kafka.server.record.BrokerCompressionType;
@@ -155,6 +157,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -3151,157 +3154,112 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testOnPartitionsDeleted() {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+    public void testOnMetadataUpdateWhenNotStarted() {
+        var runtime = mockRuntime();
+        var service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
             .build();
-        service.startup(() -> 3);
 
-        MetadataImage image = new MetadataImageBuilder()
+        var image = new MetadataImageBuilder()
             .addTopic(Uuid.randomUuid(), "foo", 1)
             .build();
+        var delta = new MetadataDelta(image);
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
-
-        when(runtime.scheduleWriteAllOperation(
-            ArgumentMatchers.eq("on-partition-deleted"),
-            ArgumentMatchers.eq(Duration.ofMillis(5000)),
-            ArgumentMatchers.any()
-        )).thenReturn(Arrays.asList(
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null),
-            
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
-        ));
-
-        when(runtime.scheduleWriteAllOperation(
-            ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
-            ArgumentMatchers.eq(Duration.ofMillis(5000)),
-            ArgumentMatchers.any()
-        )).thenReturn(Arrays.asList(
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null)
-        ));
-
-        // The exception is logged and swallowed.
-        assertDoesNotThrow(() ->
-            service.onPartitionsDeleted(
-                List.of(new TopicPartition("foo", 0)),
-                BufferSupplier.NO_CACHING
-            )
-        );
-    }
-
-    @Test
-    public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
-            .setConfig(createConfig())
-            .setRuntime(runtime)
-            .build();
-
-        assertThrows(CoordinatorNotAvailableException.class, () -> 
service.onPartitionsDeleted(
-            List.of(new TopicPartition("foo", 0)),
-            BufferSupplier.NO_CACHING
-        ));
+        assertThrows(CoordinatorNotAvailableException.class,
+            () -> service.onMetadataUpdate(delta, image));
     }
 
     @Test
-    public void testOnPartitionsDeletedCleanupShareGroupState() {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+    public void testOnMetadataUpdateSchedulesOperationsWhenTopicsDeleted() 
throws ExecutionException, InterruptedException, TimeoutException {
+        var runtime = mockRuntime();
+        var service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
             .build();
         service.startup(() -> 3);
 
-        MetadataImage image = new MetadataImageBuilder()
-            .addTopic(Uuid.randomUuid(), "foo", 1)
+        var topicId = Uuid.randomUuid();
+        var initialImage = new MetadataImageBuilder()
+            .addTopic(topicId, "foo", 1)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        // Create a delta that deletes the topic.
+        var delta = new MetadataDelta(initialImage);
+        delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+        var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
+
+        // Use incomplete futures to verify method blocks.
+        var offsetFutures = List.of(
+            new CompletableFuture<>(),
+            new CompletableFuture<>(),
+            new CompletableFuture<>()
+        );
+        var shareFutures = List.of(
+            new CompletableFuture<>(),
+            new CompletableFuture<>(),
+            new CompletableFuture<>()
+        );
 
-        // No error in partition deleted callback
         when(runtime.scheduleWriteAllOperation(
             ArgumentMatchers.eq("on-partition-deleted"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
-        )).thenReturn(List.of(
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null)
-        ));
+        )).thenReturn(offsetFutures);
 
         when(runtime.scheduleWriteAllOperation(
             ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
-        )).thenReturn(List.of(
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null),
-            
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
-        ));
+        )).thenReturn(shareFutures);
 
-        // The exception is logged and swallowed.
-        assertDoesNotThrow(() ->
-            service.onPartitionsDeleted(
-                List.of(new TopicPartition("foo", 0)),
-                BufferSupplier.NO_CACHING
-            )
-        );
+        // Run onMetadataUpdate in a separate thread.
+        var resultFuture = CompletableFuture.runAsync(() -> 
service.onMetadataUpdate(delta, newImage));
 
-        verify(runtime, times(1)).scheduleWriteAllOperation(
+        // Wait for the operations to be scheduled and verify method is 
blocked.
+        verify(runtime, timeout(5000).times(1)).scheduleWriteAllOperation(
+            ArgumentMatchers.eq("on-partition-deleted"),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        );
+        verify(runtime, timeout(5000).times(1)).scheduleWriteAllOperation(
             ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
         );
+        assertFalse(resultFuture.isDone());
+
+        // Complete all futures.
+        offsetFutures.forEach(f -> f.complete(null));
+        shareFutures.forEach(f -> f.complete(null));
+
+        // Verify method completes.
+        resultFuture.get(5, TimeUnit.SECONDS);
     }
 
     @Test
-    public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+    public void 
testOnMetadataUpdateDoesNotScheduleOperationsWhenNoTopicsDeleted() {
+        var runtime = mockRuntime();
+        var service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
             .build();
         service.startup(() -> 3);
 
-        MetadataImage image = new MetadataImageBuilder()
-            .addTopic(Uuid.randomUuid(), "bar", 1)
+        // Create an image with a topic and a delta with no deletions.
+        var image = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 1)
             .build();
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        var delta = new MetadataDelta(image);
 
-        // No error in partition deleted callback
-        when(runtime.scheduleWriteAllOperation(
-            ArgumentMatchers.eq("on-partition-deleted"),
-            ArgumentMatchers.eq(Duration.ofMillis(5000)),
-            ArgumentMatchers.any()
-        )).thenReturn(List.of(
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null)
-        ));
+        assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
 
-        when(runtime.scheduleWriteAllOperation(
-            ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
+        // Verify no operations scheduled.
+        verify(runtime, times(0)).scheduleWriteAllOperation(
+            ArgumentMatchers.eq("on-partition-deleted"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
-        )).thenReturn(List.of(
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null)
-        ));
-
-        // The exception is logged and swallowed.
-        assertDoesNotThrow(() ->
-            service.onPartitionsDeleted(
-                List.of(new TopicPartition("foo", 0)),
-                BufferSupplier.NO_CACHING
-            )
         );
-
         verify(runtime, times(0)).scheduleWriteAllOperation(
             ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
@@ -3310,47 +3268,55 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void 
testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+    public void testOnMetadataUpdateSwallowsErrorsWhenTopicsDeleted() {
+        var runtime = mockRuntime();
+        var service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
             .build();
         service.startup(() -> 3);
 
-        MetadataImage image = MetadataImage.EMPTY;
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        var topicId = Uuid.randomUuid();
+        var initialImage = new MetadataImageBuilder()
+            .addTopic(topicId, "foo", 1)
+            .build();
+
+        // Create a delta that deletes the topic.
+        var delta = new MetadataDelta(initialImage);
+        delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+        var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
 
-        // No error in partition deleted callback
+        // Mock operations with 3 futures, some failing.
         when(runtime.scheduleWriteAllOperation(
             ArgumentMatchers.eq("on-partition-deleted"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
-        )).thenReturn(List.of(
+        )).thenReturn(Arrays.asList(
             CompletableFuture.completedFuture(null),
             CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null)
+            
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
         ));
 
         when(runtime.scheduleWriteAllOperation(
             ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
-        )).thenReturn(List.of(
+        )).thenReturn(Arrays.asList(
             CompletableFuture.completedFuture(null),
             CompletableFuture.completedFuture(null),
-            CompletableFuture.completedFuture(null)
+            
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
         ));
 
-        // The exception is logged and swallowed.
-        assertDoesNotThrow(() ->
-            service.onPartitionsDeleted(
-                List.of(new TopicPartition("foo", 0)),
-                BufferSupplier.NO_CACHING
-            )
-        );
+        // Verify no exception thrown.
+        assertDoesNotThrow(() -> service.onMetadataUpdate(delta, newImage));
 
-        verify(runtime, times(0)).scheduleWriteAllOperation(
+        // Verify operations were still scheduled exactly once.
+        verify(runtime, times(1)).scheduleWriteAllOperation(
+            ArgumentMatchers.eq("on-partition-deleted"),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        );
+        verify(runtime, times(1)).scheduleWriteAllOperation(
             ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
             ArgumentMatchers.eq(Duration.ofMillis(5000)),
             ArgumentMatchers.any()
@@ -6033,7 +5999,7 @@ public class GroupCoordinatorServiceTest {
                     .build();
             }
 
-            GroupCoordinatorService service = new GroupCoordinatorService(
+            var service = new GroupCoordinatorService(
                 logContext,
                 config,
                 runtime,

Reply via email to