This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a4dcdef750 MINOR: Remove onTopicsDeleted from ShareCoordinator
(#21363)
5a4dcdef750 is described below
commit 5a4dcdef7507d35c53f7623fa4cdcb6138035c0e
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 30 15:39:24 2026 +0100
MINOR: Remove onTopicsDeleted from ShareCoordinator (#21363)
This change removes the onTopicsDeleted method from the ShareCoordinator
interface and moves its functionality into onMetadataUpdate, following
the same pattern used for GroupCoordinator (commit 2f90af70ee).
The topic deletion cleanup is now triggered automatically when
onMetadataUpdate detects deleted topics in the metadata delta, rather
than requiring a separate call from BrokerMetadataPublisher.
Changes:
- Remove onTopicsDeleted from ShareCoordinator interface
- Move cleanup logic into onMetadataUpdate via handleTopicsDeletion
- Remove the onTopicsDeleted call from BrokerMetadataPublisher
- Update tests to verify topic deletion handling in onMetadataUpdate
- Add org.apache.kafka.common.metadata to share-coordinator import
control to allow RemoveTopicRecord usage in tests
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-share-coordinator.xml | 1 +
.../server/metadata/BrokerMetadataPublisher.scala | 12 +--
.../kafka/coordinator/share/ShareCoordinator.java | 12 ---
.../coordinator/share/ShareCoordinatorService.java | 50 ++++-----
.../share/ShareCoordinatorServiceTest.java | 112 +++++++++++++++++++--
5 files changed, 133 insertions(+), 54 deletions(-)
diff --git a/checkstyle/import-control-share-coordinator.xml
b/checkstyle/import-control-share-coordinator.xml
index 1707cd0e1b9..ce67ae48b69 100644
--- a/checkstyle/import-control-share-coordinator.xml
+++ b/checkstyle/import-control-share-coordinator.xml
@@ -47,6 +47,7 @@
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.requests" />
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 1313bda8aa1..8672efcf9c5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage,
TopicDelta}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher,
DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
-import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal,
ShareVersion}
+import org.apache.kafka.server.common.{FinalizedFeatures, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
@@ -184,16 +184,6 @@ class BrokerMetadataPublisher(
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating share " +
s"coordinator with local changes in $deltaName", t)
}
- try {
- // Notify the share coordinator about deleted topics.
- val deletedTopicIds = topicsDelta.deletedTopicIds()
- if (!deletedTopicIds.isEmpty) {
- shareCoordinator.onTopicsDeleted(topicsDelta.deletedTopicIds,
RequestLocal.noCaching.bufferSupplier)
- }
- } catch {
- case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating share " +
- s"coordinator with deleted partitions in $deltaName", t)
- }
}
// Apply configuration deltas.
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index ee69827d45e..4b07209f26e 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.share;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
@@ -29,16 +28,13 @@ import
org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.requests.RequestContext;
-import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.share.SharePartitionKey;
import java.util.OptionalInt;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.function.IntSupplier;
public interface ShareCoordinator {
@@ -124,14 +120,6 @@ public interface ShareCoordinator {
*/
void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch);
- /**
- * Remove share group state related to deleted topic ids.
- *
- * @param topicPartitions The deleted topic ids.
- * @param bufferSupplier The buffer supplier tight to the request
thread.
- */
- void onTopicsDeleted(Set<Uuid> topicPartitions, BufferSupplier
bufferSupplier) throws ExecutionException, InterruptedException;
-
/**
* A new metadata image is available.
*
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index c8f5c57dc0b..2f591415574 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -39,7 +39,6 @@ import
org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
-import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -76,7 +75,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
@@ -1072,33 +1070,22 @@ public class ShareCoordinatorService implements
ShareCoordinator {
);
}
- @Override
- public void onTopicsDeleted(Set<Uuid> deletedTopicIds, BufferSupplier
bufferSupplier) throws ExecutionException, InterruptedException {
- throwIfNotActive();
- if (deletedTopicIds.isEmpty()) {
- return;
- }
- CompletableFuture.allOf(
- FutureUtils.mapExceptionally(
- runtime.scheduleWriteAllOperation(
- "on-topics-deleted",
- coordinator ->
coordinator.maybeCleanupShareState(deletedTopicIds)
- ),
- exception -> {
- log.error("Received error while trying to cleanup deleted
topics.", exception);
- return null;
- }
- ).toArray(new CompletableFuture<?>[0])
- ).get();
- }
-
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
throwIfNotActive();
+ Objects.requireNonNull(delta, "delta must be provided");
+ Objects.requireNonNull(newImage, "newImage must be provided");
+
this.runtime.onMetadataUpdate(
- new KRaftCoordinatorMetadataDelta(Objects.requireNonNull(delta,
"delta must be provided")),
- new KRaftCoordinatorMetadataImage(Objects.requireNonNull(newImage,
"newImage must be provided"))
+ new KRaftCoordinatorMetadataDelta(delta),
+ new KRaftCoordinatorMetadataImage(newImage)
);
+
+ // Handle topic deletions from the delta.
+ if (delta.topicsDelta() != null &&
!delta.topicsDelta().deletedTopicIds().isEmpty()) {
+ handleTopicsDeletion(delta.topicsDelta().deletedTopicIds());
+ }
+
boolean enabled = isShareGroupsEnabled(newImage);
// enabled shouldRunJob result (XOR)
// 0 0 no op on flag, do not call jobs
@@ -1113,6 +1100,21 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}
}
+ private void handleTopicsDeletion(Set<Uuid> deletedTopicIds) {
+ CompletableFuture.allOf(
+ FutureUtils.mapExceptionally(
+ runtime.scheduleWriteAllOperation(
+ "on-topics-deleted",
+ coordinator ->
coordinator.maybeCleanupShareState(deletedTopicIds)
+ ),
+ exception -> {
+ log.error("Received error while trying to cleanup deleted
topics.", exception);
+ return null;
+ }
+ ).toArray(new CompletableFuture<?>[0])
+ ).join();
+ }
+
TopicPartition topicPartitionFor(SharePartitionKey key) {
return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionFor(key));
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 4535e8adb78..66797199b0b 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -34,19 +34,21 @@ import
org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
@@ -2126,7 +2128,7 @@ class ShareCoordinatorServiceTest {
}
@Test
- public void testOnTopicsDeletedEmptyList() {
+ public void testOnMetadataUpdateSchedulesOperationsWhenTopicsDeleted() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
@@ -2146,6 +2148,16 @@ class ShareCoordinatorServiceTest {
service.startup(() -> 3);
+ Uuid topicId = Uuid.randomUuid();
+ MetadataImage initialImage = new MetadataImageBuilder()
+ .addTopic(topicId, "foo", 1)
+ .build();
+
+ // Create a delta that deletes the topic.
+ MetadataDelta delta = new MetadataDelta(initialImage);
+ delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+ MetadataImage newImage = delta.apply(new MetadataProvenance(1, 0, 0L,
true));
+
when(runtime.scheduleWriteAllOperation(
eq("on-topics-deleted"),
any()
@@ -2153,11 +2165,83 @@ class ShareCoordinatorServiceTest {
List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
-
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+ CompletableFuture.completedFuture(null)
)
);
- assertDoesNotThrow(() -> service.onTopicsDeleted(Set.of(),
BufferSupplier.NO_CACHING));
+ assertDoesNotThrow(() -> service.onMetadataUpdate(delta, newImage));
+
+ verify(runtime, times(1)).scheduleWriteAllOperation(
+ eq("on-topics-deleted"),
+ any()
+ );
+ }
+
+ @Test
+ public void
testOnMetadataUpdateDoesNotScheduleOperationsWhenNoTopicsDeleted() {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ Metrics metrics = new Metrics();
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(metrics),
+ time,
+ timer,
+ writer,
+ () -> true
+ ));
+
+ service.startup(() -> 3);
+
+ // Create an image with a topic and a delta with no deletions.
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 1)
+ .build();
+ MetadataDelta delta = new MetadataDelta(image);
+
+ assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
+
+ // Verify no operations scheduled.
+ verify(runtime, times(0)).scheduleWriteAllOperation(
+ eq("on-topics-deleted"),
+ any()
+ );
+ }
+
+ @Test
+ public void
testOnMetadataUpdateDoesNotScheduleOperationsWhenTopicsDeltaIsNull() {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ Metrics metrics = new Metrics();
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(metrics),
+ time,
+ timer,
+ writer,
+ () -> true
+ ));
+
+ service.startup(() -> 3);
+
+ // Use mocks where topicsDelta() returns null.
+ MetadataDelta delta = mock(MetadataDelta.class);
+ MetadataImage image = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
+ when(delta.topicsDelta()).thenReturn(null);
+
+ assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
+
+ // Verify no cleanup operations scheduled.
verify(runtime, times(0)).scheduleWriteAllOperation(
eq("on-topics-deleted"),
any()
@@ -2165,7 +2249,7 @@ class ShareCoordinatorServiceTest {
}
@Test
- public void testOnTopicsDeletedDoesNotThrowExp() {
+ public void testOnMetadataUpdateSwallowsErrorsWhenTopicsDeleted() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
MockTime time = new MockTime();
MockTimer timer = new MockTimer(time);
@@ -2185,6 +2269,17 @@ class ShareCoordinatorServiceTest {
service.startup(() -> 3);
+ Uuid topicId = Uuid.randomUuid();
+ MetadataImage initialImage = new MetadataImageBuilder()
+ .addTopic(topicId, "foo", 1)
+ .build();
+
+ // Create a delta that deletes the topic.
+ MetadataDelta delta = new MetadataDelta(initialImage);
+ delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+ MetadataImage newImage = delta.apply(new MetadataProvenance(1, 0, 0L,
true));
+
+ // Mock operations with some futures failing.
when(runtime.scheduleWriteAllOperation(
eq("on-topics-deleted"),
any()
@@ -2192,11 +2287,14 @@ class ShareCoordinatorServiceTest {
List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
-
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
)
);
- assertDoesNotThrow(() ->
service.onTopicsDeleted(Set.of(Uuid.randomUuid()), BufferSupplier.NO_CACHING));
+ // Verify no exception thrown.
+ assertDoesNotThrow(() -> service.onMetadataUpdate(delta, newImage));
+
+ // Verify operations were still scheduled.
verify(runtime, times(1)).scheduleWriteAllOperation(
eq("on-topics-deleted"),
any()