This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a57473a524 KAFKA-18484 [1/N]; Handle exceptions from deferred events 
in coordinator (#18661)
5a57473a524 is described below

commit 5a57473a524797d1b4f95df4a7899802a876e380
Author: Sean Quah <[email protected]>
AuthorDate: Wed Jan 22 13:46:19 2025 +0000

    KAFKA-18484 [1/N]; Handle exceptions from deferred events in coordinator 
(#18661)
    
    Guard against the coordinator getting stuck due to deferred events
    throwing exceptions.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  48 ++++--
 .../common/runtime/CoordinatorRuntimeTest.java     | 178 +++++++++++++++++++++
 2 files changed, 216 insertions(+), 10 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 5d9e9ef13a8..b341c0adaeb 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -460,7 +460,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      * A simple container class to hold all the attributes
      * related to a pending batch.
      */
-    private static class CoordinatorBatch {
+    private class CoordinatorBatch {
         /**
          * The base (or first) offset of the batch. If the batch fails
          * for any reason, the state machines is rolled back to it.
@@ -500,9 +500,9 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         final Optional<TimerTask> lingerTimeoutTask;
 
         /**
-         * The list of deferred events associated with the batch.
+         * The deferred events associated with the batch.
          */
-        final List<DeferredEvent> deferredEvents;
+        final DeferredEventCollection deferredEvents;
 
         /**
          * The next offset. This is updated when records
@@ -527,7 +527,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             this.buffer = buffer;
             this.builder = builder;
             this.lingerTimeoutTask = lingerTimeoutTask;
-            this.deferredEvents = new ArrayList<>();
+            this.deferredEvents = new DeferredEventCollection();
         }
     }
 
@@ -806,9 +806,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                     }
 
                     // Add all the pending deferred events to the deferred 
event queue.
-                    for (DeferredEvent event : currentBatch.deferredEvents) {
-                        deferredEventQueue.add(offset, event);
-                    }
+                    deferredEventQueue.add(offset, 
currentBatch.deferredEvents);
 
                     // Free up the current batch.
                     freeCurrentBatch();
@@ -839,9 +837,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         private void failCurrentBatch(Throwable t) {
             if (currentBatch != null) {
                 coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
-                for (DeferredEvent event : currentBatch.deferredEvents) {
-                    event.complete(t);
-                }
+                currentBatch.deferredEvents.complete(t);
                 freeCurrentBatch();
             }
         }
@@ -1157,6 +1153,38 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         }
     }
 
+    /**
+     * A collection of {@link DeferredEvent}. When completed, completes all 
the events in the collection
+     * and logs any exceptions thrown.
+     */
+    class DeferredEventCollection implements DeferredEvent {
+        private final List<DeferredEvent> events = new ArrayList<>();
+
+        @Override
+        public void complete(Throwable t) {
+            for (DeferredEvent event : events) {
+                try {
+                    event.complete(t);
+                } catch (Throwable e) {
+                    log.error("Completion of event {} failed due to {}.", 
event, e.getMessage(), e);
+                }
+            }
+        }
+
+        public boolean add(DeferredEvent event) {
+            return events.add(event);
+        }
+
+        public int size() {
+            return events.size();
+        }
+
+        @Override
+        public String toString() {
+            return "DeferredEventCollection(events=" + events + ")";
+        }
+    }
+
     /**
      * A coordinator write operation.
      *
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index a2f25b24a4c..3c2021a118c 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -94,8 +94,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -1116,6 +1118,105 @@ public class CoordinatorRuntimeTest {
         assertEquals(10, ctx.epoch);
     }
 
+    @Test
+    public void testScheduleUnloadingWithDeferredEventExceptions() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        CoordinatorRuntimeMetrics metrics = 
mock(CoordinatorRuntimeMetrics.class);
+
+        // All operations will throw an exception when completed.
+        doThrow(new 
KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(metrics)
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.withExecutor(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Load the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with three quarters of the max batch size each, so 
that it is not
+        // possible to have more than one record in a single batch.
+        List<String> records = Stream.of('1', '2', '3').map(c -> {
+            char[] payload = new char[maxBatchSize * 3 / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of(records.get(0)), 
"response1")
+        );
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of(records.get(1)), 
"response2")
+        );
+
+        // Write #3, to force the flush of write #2.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of(records.get(1)), 
"response3")
+        );
+
+        // Records have been written to the log.
+        assertEquals(List.of(
+            records(timer.time().milliseconds(), records.get(0)),
+            records(timer.time().milliseconds(), records.get(1))
+        ), writer.entries(TP));
+
+        // Verify that no writes are committed yet.
+        assertFalse(write1.isDone());
+        assertFalse(write2.isDone());
+        assertFalse(write3.isDone());
+
+        // Schedule the unloading.
+        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+        assertEquals(CLOSED, ctx.state);
+
+        // All write completions throw exceptions after completing their 
futures.
+        // Despite the exceptions, the unload should still complete.
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertFutureThrows(write1, NotCoordinatorException.class);
+        assertFutureThrows(write2, NotCoordinatorException.class);
+        assertFutureThrows(write3, NotCoordinatorException.class);
+
+        // Verify that onUnloaded is called.
+        verify(coordinator, times(1)).onUnloaded();
+
+        // Getting the coordinator context fails because it no longer exists.
+        assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+    }
+
     @Test
     public void testScheduleWriteOp() throws ExecutionException, 
InterruptedException, TimeoutException {
         MockTimer timer = new MockTimer();
@@ -3080,6 +3181,83 @@ public class CoordinatorRuntimeTest {
         assertTrue(write2.isDone());
     }
 
+    @Test
+    public void testHighWatermarkUpdateWithDeferredEventExceptions() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        CoordinatorRuntimeMetrics metrics = 
mock(CoordinatorRuntimeMetrics.class);
+
+        // All operations will throw an exception when completed.
+        doThrow(new 
KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(metrics)
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Load the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with three quarters of the max batch size each, so 
that it is not
+        // possible to have more than one record in a single batch.
+        List<String> records = Stream.of('1', '2', '3').map(c -> {
+            char[] payload = new char[maxBatchSize * 3 / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of(records.get(0)), 
"response1")
+        );
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of(records.get(1)), 
"response2")
+        );
+
+        // Write #3, to force the flush of write #2.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of(records.get(1)), 
"response3")
+        );
+
+        // Records have been written to the log.
+        assertEquals(List.of(
+            records(timer.time().milliseconds(), records.get(0)),
+            records(timer.time().milliseconds(), records.get(1))
+        ), writer.entries(TP));
+
+        // Verify that no writes are committed yet.
+        assertFalse(write1.isDone());
+        assertFalse(write2.isDone());
+        assertFalse(write3.isDone());
+
+        // Commit the first and second record.
+        writer.commit(TP, 2);
+
+        // Write #1 and write #2's completions throw exceptions after 
completing their futures.
+        // Despite the exception from write #1, write #2 should still be 
completed.
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertFalse(write3.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+    }
+
     @Test
     public void 
testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
         MockTimer timer = new MockTimer();

Reply via email to