This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2f90af70ee6 MINOR: Remove onPartitionsDeleted from GroupCoordinator
interface (#21263)
2f90af70ee6 is described below
commit 2f90af70ee61dab3dae43a82d29f9f8505056bc9
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 8 18:07:09 2026 +0100
MINOR: Remove onPartitionsDeleted from GroupCoordinator interface (#21263)
This patch removes the `onPartitionsDeleted` method from the
`GroupCoordinator` interface by moving its functionality into
`onMetadataUpdate`. The `MetadataDelta` already contains the deleted
topic information via `delta.topicsDelta().deletedTopicIds()`, making
the separate method redundant.
Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah
<[email protected]>, Lianet Magrans <[email protected]>
---
.../server/metadata/BrokerMetadataPublisher.scala | 18 --
.../unit/kafka/server/OffsetFetchRequestTest.scala | 2 +-
.../kafka/coordinator/group/GroupCoordinator.java | 12 --
.../coordinator/group/GroupCoordinatorService.java | 126 ++++++------
.../group/GroupCoordinatorServiceTest.java | 212 +++++++++------------
5 files changed, 158 insertions(+), 212 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index afc982aae00..35c44b9524d 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -23,7 +23,6 @@ import kafka.log.LogManager
import kafka.server.share.SharePartitionManager
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.group.GroupCoordinator
@@ -40,7 +39,6 @@ import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
import java.util.concurrent.CompletableFuture
-import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -186,22 +184,6 @@ class BrokerMetadataPublisher(
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating share " +
s"coordinator with local changes in $deltaName", t)
}
- try {
- // Notify the group coordinator about deleted topics.
- val deletedTopicPartitions = new
mutable.ArrayBuffer[TopicPartition]()
- topicsDelta.deletedTopicIds().forEach { id =>
- val topicImage = topicsDelta.image().getTopic(id)
- topicImage.partitions().keySet().forEach {
- id => deletedTopicPartitions += new
TopicPartition(topicImage.name(), id)
- }
- }
- if (deletedTopicPartitions.nonEmpty) {
-
groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava,
RequestLocal.noCaching.bufferSupplier)
- }
- } catch {
- case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating group " +
- s"coordinator with deleted partitions in $deltaName", t)
- }
try {
// Notify the share coordinator about deleted topics.
val deletedTopicIds = topicsDelta.deletedTopicIds()
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index f838bb42d9e..ff228f245cb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -568,7 +568,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
@ClusterTest
def testFetchOffsetWithRecreatedTopic(): Unit = {
// There are two ways to ensure that committed of recreated topics are not
returned.
- // 1) When a topic is deleted, GroupCoordinatorService#onPartitionsDeleted
is called to
+ // 1) When a topic is deleted, GroupCoordinatorService#onMetadataUpdate is
called to
// delete all its committed offsets.
// 2) Since version 10 of the OffsetCommit API, the topic id is stored
alongside the
// committed offset. When it is queried, it is only returned iff the
topic id of
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 753d1736f7c..9eb6d1cfd31 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -64,7 +64,6 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.function.IntSupplier;
/**
@@ -410,17 +409,6 @@ public interface GroupCoordinator {
*/
int partitionFor(String groupId);
- /**
- * Remove the provided deleted partitions offsets.
- *
- * @param topicPartitions The deleted partitions.
- * @param bufferSupplier The buffer supplier tight to the request
thread.
- */
- void onPartitionsDeleted(
- List<TopicPartition> topicPartitions,
- BufferSupplier bufferSupplier
- ) throws ExecutionException, InterruptedException;
-
/**
* Group coordinator is now the leader for the given partition at the
* given leader epoch. It should load cached state from the partition
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 701e58f96c8..f8dc5068dba 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -100,6 +100,7 @@ import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -136,7 +137,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
@@ -2217,63 +2217,6 @@ public class GroupCoordinatorService implements
GroupCoordinator {
);
}
- /**
- * See {@link GroupCoordinator#onPartitionsDeleted(List, BufferSupplier)}.
- */
- @Override
- public void onPartitionsDeleted(
- List<TopicPartition> topicPartitions,
- BufferSupplier bufferSupplier
- ) throws ExecutionException, InterruptedException {
- throwIfNotActive();
-
- var futures = new ArrayList<CompletableFuture<Void>>();
-
- // Handle the partition deletion for committed offsets.
- futures.addAll(
- FutureUtils.mapExceptionally(
- runtime.scheduleWriteAllOperation(
- "on-partition-deleted",
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator ->
coordinator.onPartitionsDeleted(topicPartitions)
- ),
- exception -> {
- log.error("Could not delete offsets for deleted partitions
{} due to: {}.",
- topicPartitions, exception.getMessage(), exception
- );
- return null;
- }
- )
- );
-
- // Handle the topic deletion for share state.
- if (metadataImage != null) {
- var topicIds = topicPartitions.stream()
- .filter(tp ->
metadataImage.topicMetadata(tp.topic()).isPresent())
- .map(tp -> metadataImage.topicMetadata(tp.topic()).get().id())
- .collect(Collectors.toSet());
-
- if (!topicIds.isEmpty()) {
- futures.addAll(
- FutureUtils.mapExceptionally(
- runtime.scheduleWriteAllOperation(
- "maybe-cleanup-share-group-state",
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator ->
coordinator.maybeCleanupShareGroupState(topicIds)
- ),
- exception -> {
- log.error("Unable to cleanup state for the deleted
topics {}", topicIds, exception);
- return null;
- }
- )
- );
- }
- }
-
- // Wait on the results.
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
- }
-
/**
* See {@link GroupCoordinator#onElection(int, int)}.
*/
@@ -2315,10 +2258,77 @@ public class GroupCoordinatorService implements
GroupCoordinator {
throwIfNotActive();
Objects.requireNonNull(delta, "delta must be provided");
Objects.requireNonNull(newImage, "newImage must be provided");
+
+ // Update the metadata image and propagate to runtime.
var wrappedImage = new KRaftCoordinatorMetadataImage(newImage);
var wrappedDelta = new KRaftCoordinatorMetadataDelta(delta);
metadataImage = wrappedImage;
runtime.onMetadataUpdate(wrappedDelta, wrappedImage);
+
+ // Handle partition deletions from the delta.
+ if (delta.topicsDelta() != null &&
!delta.topicsDelta().deletedTopicIds().isEmpty()) {
+ handlePartitionsDeletion(delta.topicsDelta());
+ }
+ }
+
+ /**
+ * Handles the deletion of topic partitions by scheduling write operations
+ * to delete committed offsets and clean up share group state.
+ *
+ * @param topicsDelta The topics delta containing deleted topic IDs.
+ */
+ private void handlePartitionsDeletion(TopicsDelta topicsDelta) {
+ var topicPartitions = new ArrayList<TopicPartition>();
+ var topicIds = topicsDelta.deletedTopicIds();
+
+ topicIds.forEach(topicId -> {
+ var topicImage = topicsDelta.image().getTopic(topicId);
+ if (topicImage != null) {
+ topicImage.partitions().keySet().forEach(partitionId ->
+ topicPartitions.add(new TopicPartition(topicImage.name(),
partitionId))
+ );
+ }
+ });
+
+ var futures = new ArrayList<CompletableFuture<Void>>();
+
+ if (!topicPartitions.isEmpty()) {
+ // Schedule offset deletion.
+ futures.addAll(
+ FutureUtils.mapExceptionally(
+ runtime.scheduleWriteAllOperation(
+ "on-partition-deleted",
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
+ coordinator ->
coordinator.onPartitionsDeleted(topicPartitions)
+ ),
+ exception -> {
+ log.error("Could not delete offsets for deleted
partitions {} due to: {}.",
+ topicPartitions, exception.getMessage(),
exception);
+ return null;
+ }
+ )
+ );
+ }
+
+ if (!topicIds.isEmpty()) {
+ // Schedule share group state cleanup.
+ futures.addAll(
+ FutureUtils.mapExceptionally(
+ runtime.scheduleWriteAllOperation(
+ "maybe-cleanup-share-group-state",
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
+ coordinator ->
coordinator.maybeCleanupShareGroupState(topicIds)
+ ),
+ exception -> {
+ log.error("Unable to cleanup state for the deleted
topics {}", topicIds, exception);
+ return null;
+ }
+ )
+ );
+ }
+
+ // Wait for all operations to complete.
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture<?>[0])).join();
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index df911666295..30fdc37e2b1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -76,6 +76,7 @@ import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -95,6 +96,7 @@ import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.common.TransactionVersion;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -155,6 +157,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -3151,157 +3154,112 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testOnPartitionsDeleted() {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ public void testOnMetadataUpdateWhenNotStarted() {
+ var runtime = mockRuntime();
+ var service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
- service.startup(() -> 3);
- MetadataImage image = new MetadataImageBuilder()
+ var image = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();
+ var delta = new MetadataDelta(image);
- service.onMetadataUpdate(new MetadataDelta(image), image);
-
- when(runtime.scheduleWriteAllOperation(
- ArgumentMatchers.eq("on-partition-deleted"),
- ArgumentMatchers.eq(Duration.ofMillis(5000)),
- ArgumentMatchers.any()
- )).thenReturn(Arrays.asList(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
-
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
- ));
-
- when(runtime.scheduleWriteAllOperation(
- ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
- ArgumentMatchers.eq(Duration.ofMillis(5000)),
- ArgumentMatchers.any()
- )).thenReturn(Arrays.asList(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
- ));
-
- // The exception is logged and swallowed.
- assertDoesNotThrow(() ->
- service.onPartitionsDeleted(
- List.of(new TopicPartition("foo", 0)),
- BufferSupplier.NO_CACHING
- )
- );
- }
-
- @Test
- public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
- .setConfig(createConfig())
- .setRuntime(runtime)
- .build();
-
- assertThrows(CoordinatorNotAvailableException.class, () ->
service.onPartitionsDeleted(
- List.of(new TopicPartition("foo", 0)),
- BufferSupplier.NO_CACHING
- ));
+ assertThrows(CoordinatorNotAvailableException.class,
+ () -> service.onMetadataUpdate(delta, image));
}
@Test
- public void testOnPartitionsDeletedCleanupShareGroupState() {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ public void testOnMetadataUpdateSchedulesOperationsWhenTopicsDeleted()
throws ExecutionException, InterruptedException, TimeoutException {
+ var runtime = mockRuntime();
+ var service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
service.startup(() -> 3);
- MetadataImage image = new MetadataImageBuilder()
- .addTopic(Uuid.randomUuid(), "foo", 1)
+ var topicId = Uuid.randomUuid();
+ var initialImage = new MetadataImageBuilder()
+ .addTopic(topicId, "foo", 1)
.build();
- service.onMetadataUpdate(new MetadataDelta(image), image);
+ // Create a delta that deletes the topic.
+ var delta = new MetadataDelta(initialImage);
+ delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+ var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
+
+ // Use incomplete futures to verify method blocks.
+ var offsetFutures = List.of(
+ new CompletableFuture<>(),
+ new CompletableFuture<>(),
+ new CompletableFuture<>()
+ );
+ var shareFutures = List.of(
+ new CompletableFuture<>(),
+ new CompletableFuture<>(),
+ new CompletableFuture<>()
+ );
- // No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(List.of(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
- ));
+ )).thenReturn(offsetFutures);
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(List.of(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
-
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
- ));
+ )).thenReturn(shareFutures);
- // The exception is logged and swallowed.
- assertDoesNotThrow(() ->
- service.onPartitionsDeleted(
- List.of(new TopicPartition("foo", 0)),
- BufferSupplier.NO_CACHING
- )
- );
+ // Run onMetadataUpdate in a separate thread.
+ var resultFuture = CompletableFuture.runAsync(() ->
service.onMetadataUpdate(delta, newImage));
- verify(runtime, times(1)).scheduleWriteAllOperation(
+ // Wait for the operations to be scheduled and verify method is
blocked.
+ verify(runtime, timeout(5000).times(1)).scheduleWriteAllOperation(
+ ArgumentMatchers.eq("on-partition-deleted"),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ );
+ verify(runtime, timeout(5000).times(1)).scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
);
+ assertFalse(resultFuture.isDone());
+
+ // Complete all futures.
+ offsetFutures.forEach(f -> f.complete(null));
+ shareFutures.forEach(f -> f.complete(null));
+
+ // Verify method completes.
+ resultFuture.get(5, TimeUnit.SECONDS);
}
@Test
- public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ public void
testOnMetadataUpdateDoesNotScheduleOperationsWhenNoTopicsDeleted() {
+ var runtime = mockRuntime();
+ var service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
service.startup(() -> 3);
- MetadataImage image = new MetadataImageBuilder()
- .addTopic(Uuid.randomUuid(), "bar", 1)
+ // Create an image with a topic and a delta with no deletions.
+ var image = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 1)
.build();
- service.onMetadataUpdate(new MetadataDelta(image), image);
+ var delta = new MetadataDelta(image);
- // No error in partition deleted callback
- when(runtime.scheduleWriteAllOperation(
- ArgumentMatchers.eq("on-partition-deleted"),
- ArgumentMatchers.eq(Duration.ofMillis(5000)),
- ArgumentMatchers.any()
- )).thenReturn(List.of(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
- ));
+ assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
- when(runtime.scheduleWriteAllOperation(
- ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
+ // Verify no operations scheduled.
+ verify(runtime, times(0)).scheduleWriteAllOperation(
+ ArgumentMatchers.eq("on-partition-deleted"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(List.of(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
- ));
-
- // The exception is logged and swallowed.
- assertDoesNotThrow(() ->
- service.onPartitionsDeleted(
- List.of(new TopicPartition("foo", 0)),
- BufferSupplier.NO_CACHING
- )
);
-
verify(runtime, times(0)).scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
@@ -3310,47 +3268,55 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void
testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ public void testOnMetadataUpdateSwallowsErrorsWhenTopicsDeleted() {
+ var runtime = mockRuntime();
+ var service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
service.startup(() -> 3);
- MetadataImage image = MetadataImage.EMPTY;
- service.onMetadataUpdate(new MetadataDelta(image), image);
+ var topicId = Uuid.randomUuid();
+ var initialImage = new MetadataImageBuilder()
+ .addTopic(topicId, "foo", 1)
+ .build();
+
+ // Create a delta that deletes the topic.
+ var delta = new MetadataDelta(initialImage);
+ delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+ var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
- // No error in partition deleted callback
+ // Mock operations with 3 futures, some failing.
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(List.of(
+ )).thenReturn(Arrays.asList(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
+
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
));
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(List.of(
+ )).thenReturn(Arrays.asList(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
+
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
));
- // The exception is logged and swallowed.
- assertDoesNotThrow(() ->
- service.onPartitionsDeleted(
- List.of(new TopicPartition("foo", 0)),
- BufferSupplier.NO_CACHING
- )
- );
+ // Verify no exception thrown.
+ assertDoesNotThrow(() -> service.onMetadataUpdate(delta, newImage));
- verify(runtime, times(0)).scheduleWriteAllOperation(
+ // Verify operations were still scheduled exactly once.
+ verify(runtime, times(1)).scheduleWriteAllOperation(
+ ArgumentMatchers.eq("on-partition-deleted"),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ );
+ verify(runtime, times(1)).scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
@@ -6033,7 +5999,7 @@ public class GroupCoordinatorServiceTest {
.build();
}
- GroupCoordinatorService service = new GroupCoordinatorService(
+ var service = new GroupCoordinatorService(
logContext,
config,
runtime,