This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new af9f9fe3301 KAFKA-18200; Handle empty batches in coordinator runtime 
(#18144)
af9f9fe3301 is described below

commit af9f9fe33014184170c20850e22cc4f5dcb21061
Author: Sean Quah <[email protected]>
AuthorDate: Tue Dec 17 07:39:48 2024 +0000

    KAFKA-18200; Handle empty batches in coordinator runtime (#18144)
    
    * Avoid attaching empty writes to empty batches.
    * Handle flushes of empty batches, which would return a 0 offset otherwise.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  13 ++-
 .../common/runtime/CoordinatorRuntimeTest.java     | 102 ++++++++++++++++++++-
 2 files changed, 113 insertions(+), 2 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index ee0cf18212a..c08e20b98cd 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -768,6 +768,17 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         private void flushCurrentBatch() {
             if (currentBatch != null) {
                 try {
+                    if (currentBatch.builder.numRecords() == 0) {
+                        // The only way we can get here is if append() has 
failed in an unexpected
+                        // way and left an empty batch. Try to clean it up.
+                        log.debug("Tried to flush an empty batch for {}.", tp);
+                        // There should not be any deferred events attached to 
the batch. We fail
+                        // the batch just in case. As a side effect, 
coordinator state is also
+                        // reverted, but there should be no changes since the 
batch was empty.
+                        failCurrentBatch(new IllegalStateException("Record 
batch was empty"));
+                        return;
+                    }
+
                     long flushStartMs = time.milliseconds();
                     // Write the records to the log and update the last 
written offset.
                     long offset = partitionWriter.append(
@@ -926,7 +937,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 // If the records are empty, it was a read operation after 
all. In this case,
                 // the response can be returned directly iff there are no 
pending write operations;
                 // otherwise, the read needs to wait on the last write 
operation to be completed.
-                if (currentBatch != null) {
+                if (currentBatch != null && currentBatch.builder.numRecords() 
> 0) {
                     currentBatch.deferredEvents.add(event);
                 } else {
                     if (coordinator.lastCommittedOffset() < 
coordinator.lastWrittenOffset()) {
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 40d059fa3d6..a2f25b24a4c 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -54,6 +54,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentMatcher;
 
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.time.Duration;
@@ -101,7 +102,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("checkstyle:JavaNCSS")
+@SuppressWarnings({"checkstyle:JavaNCSS", 
"checkstyle:ClassDataAbstractionCoupling"})
 public class CoordinatorRuntimeTest {
     private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
     private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
@@ -120,6 +121,34 @@ public class CoordinatorRuntimeTest {
         }
     }
 
+    private static class ThrowingSerializer<T> implements Serializer<T> {
+        private final Serializer<T> serializer;
+        private boolean throwOnNextOperation;
+
+        public ThrowingSerializer(Serializer<T> serializer) {
+            this.serializer = serializer;
+            this.throwOnNextOperation = false;
+        }
+
+        public void throwOnNextOperation() {
+            throwOnNextOperation = true;
+        }
+
+        @Override
+        public byte[] serializeKey(T record) {
+            return serializer.serializeKey(record);
+        }
+
+        @Override
+        public byte[] serializeValue(T record) {
+            if (throwOnNextOperation) {
+                throwOnNextOperation = false;
+                throw new BufferOverflowException();
+            }
+            return serializer.serializeValue(record);
+        }
+    }
+
     /**
      * A CoordinatorEventProcessor that directly executes the operations. This 
is
      * useful in unit tests where execution in threads is not required.
@@ -270,6 +299,10 @@ public class CoordinatorRuntimeTest {
             if (batch.sizeInBytes() > config(tp).maxMessageSize())
                 throw new RecordTooLargeException("Batch is larger than the 
max message size");
 
+            // We don't want the coordinator to write empty batches.
+            if (batch.validBytes() <= 0)
+                throw new KafkaException("Coordinator tried to write an empty 
batch");
+
             if (writeCount.incrementAndGet() > maxWrites)
                 throw new KafkaException("Maximum number of writes reached");
 
@@ -4213,6 +4246,73 @@ public class CoordinatorRuntimeTest {
         assertEquals(Collections.emptyList(), writer.entries(TP));
     }
 
+    @Test
+    public void testEmptyBatch() throws Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ThrowingSerializer<String> serializer = new 
ThrowingSerializer<String>(new StringSerializer());
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertNull(ctx.currentBatch);
+
+        // Write #1, which fails.
+        serializer.throwOnNextOperation();
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("1"), "response1"));
+
+        // Write #1 should fail and leave an empty batch.
+        assertFutureThrows(write1, BufferOverflowException.class);
+        assertNotNull(ctx.currentBatch);
+
+        // Write #2, with no records.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response2"));
+
+        // Write #2 should not be attached to the empty batch.
+        assertTrue(write2.isDone());
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+
+        // Complete transaction #1. It will flush the current empty batch.
+        // The coordinator must not try to write an empty batch, otherwise the 
mock partition writer
+        // will throw an exception.
+        CompletableFuture<Void> complete1 = 
runtime.scheduleTransactionCompletion(
+            "complete#1",
+            TP,
+            100L,
+            (short) 50,
+            10,
+            TransactionResult.COMMIT,
+            DEFAULT_WRITE_TIMEOUT
+        );
+
+        // Verify that the completion is not committed yet.
+        assertFalse(complete1.isDone());
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertNull(complete1.get(5, TimeUnit.SECONDS));
+    }
+
     @Test
     public void testRecordFlushTime() throws Exception {
         MockTimer timer = new MockTimer();

Reply via email to