This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a4dcdef750 MINOR: Remove onTopicsDeleted from ShareCoordinator 
(#21363)
5a4dcdef750 is described below

commit 5a4dcdef7507d35c53f7623fa4cdcb6138035c0e
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 30 15:39:24 2026 +0100

    MINOR: Remove onTopicsDeleted from ShareCoordinator (#21363)
    
    This change removes the onTopicsDeleted method from the ShareCoordinator
    interface and moves its functionality into onMetadataUpdate, following
    the same pattern used for GroupCoordinator (commit 2f90af70ee).
    
    The topic deletion cleanup is now triggered automatically when
    onMetadataUpdate detects deleted topics in the metadata delta, rather
    than requiring a separate call from BrokerMetadataPublisher.
    
    Changes:
    - Remove onTopicsDeleted from ShareCoordinator interface
    - Move cleanup logic into onMetadataUpdate via handleTopicsDeletion
    - Remove the onTopicsDeleted call from BrokerMetadataPublisher
    - Update tests to verify topic deletion handling in onMetadataUpdate
    - Add org.apache.kafka.common.metadata to share-coordinator import
    control to allow RemoveTopicRecord usage in tests
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-share-coordinator.xml    |   1 +
 .../server/metadata/BrokerMetadataPublisher.scala  |  12 +--
 .../kafka/coordinator/share/ShareCoordinator.java  |  12 ---
 .../coordinator/share/ShareCoordinatorService.java |  50 ++++-----
 .../share/ShareCoordinatorServiceTest.java         | 112 +++++++++++++++++++--
 5 files changed, 133 insertions(+), 54 deletions(-)

diff --git a/checkstyle/import-control-share-coordinator.xml 
b/checkstyle/import-control-share-coordinator.xml
index 1707cd0e1b9..ce67ae48b69 100644
--- a/checkstyle/import-control-share-coordinator.xml
+++ b/checkstyle/import-control-share-coordinator.xml
@@ -47,6 +47,7 @@
             <allow pkg="org.apache.kafka.common.annotation" />
             <allow pkg="org.apache.kafka.common.config" />
             <allow pkg="org.apache.kafka.common.message" />
+            <allow pkg="org.apache.kafka.common.metadata" />
             <allow pkg="org.apache.kafka.common.network" />
             <allow pkg="org.apache.kafka.common.internals" />
             <allow pkg="org.apache.kafka.common.requests" />
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 1313bda8aa1..8672efcf9c5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
TopicDelta}
 import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, 
DynamicTopicClusterQuotaPublisher, ScramPublisher}
 import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
-import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, 
ShareVersion}
+import org.apache.kafka.server.common.{FinalizedFeatures, ShareVersion}
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
 
@@ -184,16 +184,6 @@ class BrokerMetadataPublisher(
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating share " +
             s"coordinator with local changes in $deltaName", t)
         }
-        try {
-          // Notify the share coordinator about deleted topics.
-          val deletedTopicIds = topicsDelta.deletedTopicIds()
-          if (!deletedTopicIds.isEmpty) {
-            shareCoordinator.onTopicsDeleted(topicsDelta.deletedTopicIds, 
RequestLocal.noCaching.bufferSupplier)
-          }
-        } catch {
-          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating share " +
-            s"coordinator with deleted partitions in $deltaName", t)
-        }
       }
 
       // Apply configuration deltas.
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index ee69827d45e..4b07209f26e 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.coordinator.share;
 
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
@@ -29,16 +28,13 @@ import 
org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
 import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.requests.RequestContext;
-import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.share.SharePartitionKey;
 
 import java.util.OptionalInt;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.function.IntSupplier;
 
 public interface ShareCoordinator {
@@ -124,14 +120,6 @@ public interface ShareCoordinator {
      */
     void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch);
 
-    /**
-     * Remove share group state related to deleted topic ids.
-     *
-     * @param topicPartitions   The deleted topic ids.
-     * @param bufferSupplier    The buffer supplier tight to the request 
thread.
-     */
-    void onTopicsDeleted(Set<Uuid> topicPartitions, BufferSupplier 
bufferSupplier) throws ExecutionException, InterruptedException;
-
     /**
      * A new metadata image is available.
      *
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index c8f5c57dc0b..2f591415574 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -39,7 +39,6 @@ import 
org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
-import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -76,7 +75,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
@@ -1072,33 +1070,22 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         );
     }
 
-    @Override
-    public void onTopicsDeleted(Set<Uuid> deletedTopicIds, BufferSupplier 
bufferSupplier) throws ExecutionException, InterruptedException {
-        throwIfNotActive();
-        if (deletedTopicIds.isEmpty()) {
-            return;
-        }
-        CompletableFuture.allOf(
-            FutureUtils.mapExceptionally(
-                runtime.scheduleWriteAllOperation(
-                    "on-topics-deleted",
-                    coordinator -> 
coordinator.maybeCleanupShareState(deletedTopicIds)
-                ),
-                exception -> {
-                    log.error("Received error while trying to cleanup deleted 
topics.", exception);
-                    return null;
-                }
-            ).toArray(new CompletableFuture<?>[0])
-        ).get();
-    }
-
     @Override
     public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
         throwIfNotActive();
+        Objects.requireNonNull(delta, "delta must be provided");
+        Objects.requireNonNull(newImage, "newImage must be provided");
+
         this.runtime.onMetadataUpdate(
-            new KRaftCoordinatorMetadataDelta(Objects.requireNonNull(delta, 
"delta must be provided")),
-            new KRaftCoordinatorMetadataImage(Objects.requireNonNull(newImage, 
"newImage must be provided"))
+            new KRaftCoordinatorMetadataDelta(delta),
+            new KRaftCoordinatorMetadataImage(newImage)
         );
+
+        // Handle topic deletions from the delta.
+        if (delta.topicsDelta() != null && 
!delta.topicsDelta().deletedTopicIds().isEmpty()) {
+            handleTopicsDeletion(delta.topicsDelta().deletedTopicIds());
+        }
+
         boolean enabled = isShareGroupsEnabled(newImage);
         // enabled    shouldRunJob         result (XOR)
         // 0            0               no op on flag, do not call jobs
@@ -1113,6 +1100,21 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         }
     }
 
+    private void handleTopicsDeletion(Set<Uuid> deletedTopicIds) {
+        CompletableFuture.allOf(
+            FutureUtils.mapExceptionally(
+                runtime.scheduleWriteAllOperation(
+                    "on-topics-deleted",
+                    coordinator -> 
coordinator.maybeCleanupShareState(deletedTopicIds)
+                ),
+                exception -> {
+                    log.error("Received error while trying to cleanup deleted 
topics.", exception);
+                    return null;
+                }
+            ).toArray(new CompletableFuture<?>[0])
+        ).join();
+    }
+
     TopicPartition topicPartitionFor(SharePartitionKey key) {
         return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 
partitionFor(key));
     }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 4535e8adb78..66797199b0b 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -34,19 +34,21 @@ import 
org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
 import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.FutureUtils;
@@ -2126,7 +2128,7 @@ class ShareCoordinatorServiceTest {
     }
 
     @Test
-    public void testOnTopicsDeletedEmptyList() {
+    public void testOnMetadataUpdateSchedulesOperationsWhenTopicsDeleted() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         MockTime time = new MockTime();
         MockTimer timer = new MockTimer(time);
@@ -2146,6 +2148,16 @@ class ShareCoordinatorServiceTest {
 
         service.startup(() -> 3);
 
+        Uuid topicId = Uuid.randomUuid();
+        MetadataImage initialImage = new MetadataImageBuilder()
+            .addTopic(topicId, "foo", 1)
+            .build();
+
+        // Create a delta that deletes the topic.
+        MetadataDelta delta = new MetadataDelta(initialImage);
+        delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+        MetadataImage newImage = delta.apply(new MetadataProvenance(1, 0, 0L, 
true));
+
         when(runtime.scheduleWriteAllOperation(
             eq("on-topics-deleted"),
             any()
@@ -2153,11 +2165,83 @@ class ShareCoordinatorServiceTest {
             List.of(
                 CompletableFuture.completedFuture(null),
                 CompletableFuture.completedFuture(null),
-                
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+                CompletableFuture.completedFuture(null)
             )
         );
 
-        assertDoesNotThrow(() -> service.onTopicsDeleted(Set.of(), 
BufferSupplier.NO_CACHING));
+        assertDoesNotThrow(() -> service.onMetadataUpdate(delta, newImage));
+
+        verify(runtime, times(1)).scheduleWriteAllOperation(
+            eq("on-topics-deleted"),
+            any()
+        );
+    }
+
+    @Test
+    public void 
testOnMetadataUpdateDoesNotScheduleOperationsWhenNoTopicsDeleted() {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        Metrics metrics = new Metrics();
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(metrics),
+            time,
+            timer,
+            writer,
+            () -> true
+        ));
+
+        service.startup(() -> 3);
+
+        // Create an image with a topic and a delta with no deletions.
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 1)
+            .build();
+        MetadataDelta delta = new MetadataDelta(image);
+
+        assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
+
+        // Verify no operations scheduled.
+        verify(runtime, times(0)).scheduleWriteAllOperation(
+            eq("on-topics-deleted"),
+            any()
+        );
+    }
+
+    @Test
+    public void 
testOnMetadataUpdateDoesNotScheduleOperationsWhenTopicsDeltaIsNull() {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        Metrics metrics = new Metrics();
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(metrics),
+            time,
+            timer,
+            writer,
+            () -> true
+        ));
+
+        service.startup(() -> 3);
+
+        // Use mocks where topicsDelta() returns null.
+        MetadataDelta delta = mock(MetadataDelta.class);
+        MetadataImage image = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
+        when(delta.topicsDelta()).thenReturn(null);
+
+        assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
+
+        // Verify no cleanup operations scheduled.
         verify(runtime, times(0)).scheduleWriteAllOperation(
             eq("on-topics-deleted"),
             any()
@@ -2165,7 +2249,7 @@ class ShareCoordinatorServiceTest {
     }
 
     @Test
-    public void testOnTopicsDeletedDoesNotThrowExp() {
+    public void testOnMetadataUpdateSwallowsErrorsWhenTopicsDeleted() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         MockTime time = new MockTime();
         MockTimer timer = new MockTimer(time);
@@ -2185,6 +2269,17 @@ class ShareCoordinatorServiceTest {
 
         service.startup(() -> 3);
 
+        Uuid topicId = Uuid.randomUuid();
+        MetadataImage initialImage = new MetadataImageBuilder()
+            .addTopic(topicId, "foo", 1)
+            .build();
+
+        // Create a delta that deletes the topic.
+        MetadataDelta delta = new MetadataDelta(initialImage);
+        delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+        MetadataImage newImage = delta.apply(new MetadataProvenance(1, 0, 0L, 
true));
+
+        // Mock operations with some futures failing.
         when(runtime.scheduleWriteAllOperation(
             eq("on-topics-deleted"),
             any()
@@ -2192,11 +2287,14 @@ class ShareCoordinatorServiceTest {
             List.of(
                 CompletableFuture.completedFuture(null),
                 CompletableFuture.completedFuture(null),
-                
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+                
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
             )
         );
 
-        assertDoesNotThrow(() -> 
service.onTopicsDeleted(Set.of(Uuid.randomUuid()), BufferSupplier.NO_CACHING));
+        // Verify no exception thrown.
+        assertDoesNotThrow(() -> service.onMetadataUpdate(delta, newImage));
+
+        // Verify operations were still scheduled.
         verify(runtime, times(1)).scheduleWriteAllOperation(
             eq("on-topics-deleted"),
             any()

Reply via email to