This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5946f27ac5b KAFKA-18484 [2/2]; Handle exceptions during coordinator
unload (#18667)
5946f27ac5b is described below
commit 5946f27ac5bc1f4a5bc162ccc26f130933ab2182
Author: Sean Quah <[email protected]>
AuthorDate: Thu Jan 23 16:15:21 2025 +0000
KAFKA-18484 [2/2]; Handle exceptions during coordinator unload (#18667)
Ensure that unloading a coordinator always succeeds. Previously, we have
guarded against exceptions from DeferredEvent completions. All that
remains is handling exceptions from the onUnloaded() method of the
coordinator state machine.
Reviewers: David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 24 ++++++++--
.../common/runtime/CoordinatorRuntimeTest.java | 52 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 4 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index b341c0adaeb..1e9724a57aa 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -742,7 +742,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
- coordinator.onUnloaded();
+ try {
+ coordinator.onUnloaded();
+ } catch (Throwable ex) {
+ log.error("Failed to unload coordinator for {} due to
{}.", tp, ex.getMessage(), ex);
+ }
}
coordinator = null;
}
@@ -2415,9 +2419,19 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
try {
if (partitionEpoch.isEmpty() || context.epoch <
partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch
{}.", tp, partitionEpoch);
- context.transitionTo(CoordinatorState.CLOSED);
- coordinators.remove(tp, context);
- log.info("Finished unloading metadata for {} with
epoch {}.", tp, partitionEpoch);
+ try {
+ context.transitionTo(CoordinatorState.CLOSED);
+ log.info("Finished unloading metadata for {} with
epoch {}.", tp, partitionEpoch);
+ } catch (Throwable ex) {
+ // It's very unlikely that we will ever see an
exception here, since we
+ // already make an effort to catch exceptions in
the unload method.
+ log.error("Failed to unload metadata for {} with
epoch {} due to {}.",
+ tp, partitionEpoch, ex.toString());
+ } finally {
+ // Always remove the coordinator context,
otherwise the coordinator
+ // shard could be permanently stuck.
+ coordinators.remove(tp, context);
+ }
} else {
log.info("Ignored unloading metadata for {} in epoch
{} since current epoch is {}.",
tp, partitionEpoch, context.epoch);
@@ -2498,6 +2512,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
context.lock.lock();
try {
context.transitionTo(CoordinatorState.CLOSED);
+ } catch (Throwable ex) {
+ log.warn("Failed to unload metadata for {} due to {}.", tp,
ex.getMessage(), ex);
} finally {
context.lock.unlock();
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 3c2021a118c..9e4e6f7bb9b 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -1118,6 +1118,58 @@ public class CoordinatorRuntimeTest {
assertEquals(10, ctx.epoch);
}
+ @Test
+ public void testScheduleUnloadingWithException() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorShardBuilderSupplier supplier =
mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder =
mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ CoordinatorRuntimeMetrics metrics =
mock(CoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(metrics)
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ doThrow(new KafkaException("error")).when(coordinator).onUnloaded();
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.withExecutor(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+
+ // Loads the coordinator. It directly transitions to active.
+ runtime.scheduleLoadOperation(TP, 10);
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(ACTIVE, ctx.state);
+ assertEquals(10, ctx.epoch);
+
+ // Schedule the unloading.
+ runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+ assertEquals(CLOSED, ctx.state);
+
+ // Verify that onUnloaded is called.
+ verify(coordinator, times(1)).onUnloaded();
+
+ // Getting the coordinator context fails because it no longer exists.
+ assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
+ }
+
@Test
public void testScheduleUnloadingWithDeferredEventExceptions() throws
ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();