This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a57473a524 KAFKA-18484 [1/N]; Handle exceptions from deferred events
in coordinator (#18661)
5a57473a524 is described below
commit 5a57473a524797d1b4f95df4a7899802a876e380
Author: Sean Quah <[email protected]>
AuthorDate: Wed Jan 22 13:46:19 2025 +0000
KAFKA-18484 [1/N]; Handle exceptions from deferred events in coordinator
(#18661)
Guard against the coordinator getting stuck due to deferred events
throwing exceptions.
Reviewers: David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 48 ++++--
.../common/runtime/CoordinatorRuntimeTest.java | 178 +++++++++++++++++++++
2 files changed, 216 insertions(+), 10 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 5d9e9ef13a8..b341c0adaeb 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -460,7 +460,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
* A simple container class to hold all the attributes
* related to a pending batch.
*/
- private static class CoordinatorBatch {
+ private class CoordinatorBatch {
/**
* The base (or first) offset of the batch. If the batch fails
* for any reason, the state machines is rolled back to it.
@@ -500,9 +500,9 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
final Optional<TimerTask> lingerTimeoutTask;
/**
- * The list of deferred events associated with the batch.
+ * The deferred events associated with the batch.
*/
- final List<DeferredEvent> deferredEvents;
+ final DeferredEventCollection deferredEvents;
/**
* The next offset. This is updated when records
@@ -527,7 +527,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
this.buffer = buffer;
this.builder = builder;
this.lingerTimeoutTask = lingerTimeoutTask;
- this.deferredEvents = new ArrayList<>();
+ this.deferredEvents = new DeferredEventCollection();
}
}
@@ -806,9 +806,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
// Add all the pending deferred events to the deferred
event queue.
- for (DeferredEvent event : currentBatch.deferredEvents) {
- deferredEventQueue.add(offset, event);
- }
+ deferredEventQueue.add(offset,
currentBatch.deferredEvents);
// Free up the current batch.
freeCurrentBatch();
@@ -839,9 +837,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
private void failCurrentBatch(Throwable t) {
if (currentBatch != null) {
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
- for (DeferredEvent event : currentBatch.deferredEvents) {
- event.complete(t);
- }
+ currentBatch.deferredEvents.complete(t);
freeCurrentBatch();
}
}
@@ -1157,6 +1153,38 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
}
+ /**
+ * A collection of {@link DeferredEvent}. When completed, completes all
the events in the collection
+ * and logs any exceptions thrown.
+ */
+ class DeferredEventCollection implements DeferredEvent {
+ private final List<DeferredEvent> events = new ArrayList<>();
+
+ @Override
+ public void complete(Throwable t) {
+ for (DeferredEvent event : events) {
+ try {
+ event.complete(t);
+ } catch (Throwable e) {
+ log.error("Completion of event {} failed due to {}.",
event, e.getMessage(), e);
+ }
+ }
+ }
+
+ public boolean add(DeferredEvent event) {
+ return events.add(event);
+ }
+
+ public int size() {
+ return events.size();
+ }
+
+ @Override
+ public String toString() {
+ return "DeferredEventCollection(events=" + events + ")";
+ }
+ }
+
/**
* A coordinator write operation.
*
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index a2f25b24a4c..3c2021a118c 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -94,8 +94,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -1116,6 +1118,105 @@ public class CoordinatorRuntimeTest {
assertEquals(10, ctx.epoch);
}
+ @Test
+ public void testScheduleUnloadingWithDeferredEventExceptions() throws
ExecutionException, InterruptedException, TimeoutException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ MockCoordinatorShardBuilderSupplier supplier =
mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder =
mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ CoordinatorRuntimeMetrics metrics =
mock(CoordinatorRuntimeMetrics.class);
+
+ // All operations will throw an exception when completed.
+ doThrow(new
KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(metrics)
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.withExecutor(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+
+ // Load the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with three quarters of the max batch size each, so
that it is not
+ // possible to have more than one record in a single batch.
+ List<String> records = Stream.of('1', '2', '3').map(c -> {
+ char[] payload = new char[maxBatchSize * 3 / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of(records.get(0)),
"response1")
+ );
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of(records.get(1)),
"response2")
+ );
+
+ // Write #3, to force the flush of write #2.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of(records.get(1)),
"response3")
+ );
+
+ // Records have been written to the log.
+ assertEquals(List.of(
+ records(timer.time().milliseconds(), records.get(0)),
+ records(timer.time().milliseconds(), records.get(1))
+ ), writer.entries(TP));
+
+ // Verify that no writes are committed yet.
+ assertFalse(write1.isDone());
+ assertFalse(write2.isDone());
+ assertFalse(write3.isDone());
+
+ // Schedule the unloading.
+ runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+ assertEquals(CLOSED, ctx.state);
+
+ // All write completions throw exceptions after completing their
futures.
+ // Despite the exceptions, the unload should still complete.
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertTrue(write3.isDone());
+ assertFutureThrows(write1, NotCoordinatorException.class);
+ assertFutureThrows(write2, NotCoordinatorException.class);
+ assertFutureThrows(write3, NotCoordinatorException.class);
+
+ // Verify that onUnloaded is called.
+ verify(coordinator, times(1)).onUnloaded();
+
+ // Getting the coordinator context fails because it no longer exists.
+ assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
+ }
+
@Test
public void testScheduleWriteOp() throws ExecutionException,
InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
@@ -3080,6 +3181,83 @@ public class CoordinatorRuntimeTest {
assertTrue(write2.isDone());
}
+ @Test
+ public void testHighWatermarkUpdateWithDeferredEventExceptions() throws
ExecutionException, InterruptedException, TimeoutException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ CoordinatorRuntimeMetrics metrics =
mock(CoordinatorRuntimeMetrics.class);
+
+ // All operations will throw an exception when completed.
+ doThrow(new
KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(metrics)
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Load the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with three quarters of the max batch size each, so
that it is not
+ // possible to have more than one record in a single batch.
+ List<String> records = Stream.of('1', '2', '3').map(c -> {
+ char[] payload = new char[maxBatchSize * 3 / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of(records.get(0)),
"response1")
+ );
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of(records.get(1)),
"response2")
+ );
+
+ // Write #3, to force the flush of write #2.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of(records.get(1)),
"response3")
+ );
+
+ // Records have been written to the log.
+ assertEquals(List.of(
+ records(timer.time().milliseconds(), records.get(0)),
+ records(timer.time().milliseconds(), records.get(1))
+ ), writer.entries(TP));
+
+ // Verify that no writes are committed yet.
+ assertFalse(write1.isDone());
+ assertFalse(write2.isDone());
+ assertFalse(write3.isDone());
+
+ // Commit the first and second record.
+ writer.commit(TP, 2);
+
+ // Write #1 and write #2's completions throw exceptions after
completing their futures.
+ // Despite the exception from write #1, write #2 should still be
completed.
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertFalse(write3.isDone());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ }
+
@Test
public void
testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
MockTimer timer = new MockTimer();