This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5946f27ac5b KAFKA-18484 [2/2]; Handle exceptions during coordinator 
unload (#18667)
5946f27ac5b is described below

commit 5946f27ac5bc1f4a5bc162ccc26f130933ab2182
Author: Sean Quah <[email protected]>
AuthorDate: Thu Jan 23 16:15:21 2025 +0000

    KAFKA-18484 [2/2]; Handle exceptions during coordinator unload (#18667)
    
    Ensure that unloading a coordinator always succeeds. Previously, we have
    guarded against exceptions from DeferredEvent completions. All that
    remains is handling exceptions from the onUnloaded() method of the
    coordinator state machine.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         | 24 ++++++++--
 .../common/runtime/CoordinatorRuntimeTest.java     | 52 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index b341c0adaeb..1e9724a57aa 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -742,7 +742,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
             failCurrentBatch(Errors.NOT_COORDINATOR.exception());
             if (coordinator != null) {
-                coordinator.onUnloaded();
+                try {
+                    coordinator.onUnloaded();
+                } catch (Throwable ex) {
+                    log.error("Failed to unload coordinator for {} due to 
{}.", tp, ex.getMessage(), ex);
+                }
             }
             coordinator = null;
         }
@@ -2415,9 +2419,19 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 try {
                     if (partitionEpoch.isEmpty() || context.epoch < 
partitionEpoch.getAsInt()) {
                         log.info("Started unloading metadata for {} with epoch 
{}.", tp, partitionEpoch);
-                        context.transitionTo(CoordinatorState.CLOSED);
-                        coordinators.remove(tp, context);
-                        log.info("Finished unloading metadata for {} with 
epoch {}.", tp, partitionEpoch);
+                        try {
+                            context.transitionTo(CoordinatorState.CLOSED);
+                            log.info("Finished unloading metadata for {} with 
epoch {}.", tp, partitionEpoch);
+                        } catch (Throwable ex) {
+                            // It's very unlikely that we will ever see an 
exception here, since we
+                            // already make an effort to catch exceptions in 
the unload method.
+                            log.error("Failed to unload metadata for {} with 
epoch {} due to {}.",
+                                tp, partitionEpoch, ex.toString());
+                        } finally {
+                            // Always remove the coordinator context, 
otherwise the coordinator
+                            // shard could be permanently stuck.
+                            coordinators.remove(tp, context);
+                        }
                     } else {
                         log.info("Ignored unloading metadata for {} in epoch 
{} since current epoch is {}.",
                             tp, partitionEpoch, context.epoch);
@@ -2498,6 +2512,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             context.lock.lock();
             try {
                 context.transitionTo(CoordinatorState.CLOSED);
+            } catch (Throwable ex) {
+                log.warn("Failed to unload metadata for {} due to {}.", tp, 
ex.getMessage(), ex);
             } finally {
                 context.lock.unlock();
             }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 3c2021a118c..9e4e6f7bb9b 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -1118,6 +1118,58 @@ public class CoordinatorRuntimeTest {
         assertEquals(10, ctx.epoch);
     }
 
+    @Test
+    public void testScheduleUnloadingWithException() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        CoordinatorRuntimeMetrics metrics = 
mock(CoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(metrics)
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        doThrow(new KafkaException("error")).when(coordinator).onUnloaded();
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.withExecutor(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Loads the coordinator. It directly transitions to active.
+        runtime.scheduleLoadOperation(TP, 10);
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(ACTIVE, ctx.state);
+        assertEquals(10, ctx.epoch);
+
+        // Schedule the unloading.
+        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+        assertEquals(CLOSED, ctx.state);
+
+        // Verify that onUnloaded is called.
+        verify(coordinator, times(1)).onUnloaded();
+
+        // Getting the coordinator context fails because it no longer exists.
+        assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+    }
+
     @Test
     public void testScheduleUnloadingWithDeferredEventExceptions() throws 
ExecutionException, InterruptedException, TimeoutException {
         MockTimer timer = new MockTimer();

Reply via email to