This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new af9f9fe3301 KAFKA-18200; Handle empty batches in coordinator runtime
(#18144)
af9f9fe3301 is described below
commit af9f9fe33014184170c20850e22cc4f5dcb21061
Author: Sean Quah <[email protected]>
AuthorDate: Tue Dec 17 07:39:48 2024 +0000
KAFKA-18200; Handle empty batches in coordinator runtime (#18144)
* Avoid attaching empty writes to empty batches.
* Handle flushes of empty batches, which would return a 0 offset otherwise.
Reviewers: David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 13 ++-
.../common/runtime/CoordinatorRuntimeTest.java | 102 ++++++++++++++++++++-
2 files changed, 113 insertions(+), 2 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index ee0cf18212a..c08e20b98cd 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -768,6 +768,17 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
private void flushCurrentBatch() {
if (currentBatch != null) {
try {
+ if (currentBatch.builder.numRecords() == 0) {
+ // The only way we can get here is if append() has
failed in an unexpected
+ // way and left an empty batch. Try to clean it up.
+ log.debug("Tried to flush an empty batch for {}.", tp);
+ // There should not be any deferred events attached to
the batch. We fail
+ // the batch just in case. As a side effect,
coordinator state is also
+ // reverted, but there should be no changes since the
batch was empty.
+ failCurrentBatch(new IllegalStateException("Record
batch was empty"));
+ return;
+ }
+
long flushStartMs = time.milliseconds();
// Write the records to the log and update the last
written offset.
long offset = partitionWriter.append(
@@ -926,7 +937,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
// If the records are empty, it was a read operation after
all. In this case,
// the response can be returned directly iff there are no
pending write operations;
// otherwise, the read needs to wait on the last write
operation to be completed.
- if (currentBatch != null) {
+ if (currentBatch != null && currentBatch.builder.numRecords()
> 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() <
coordinator.lastWrittenOffset()) {
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 40d059fa3d6..a2f25b24a4c 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -54,6 +54,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
@@ -101,7 +102,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@SuppressWarnings("checkstyle:JavaNCSS")
+@SuppressWarnings({"checkstyle:JavaNCSS",
"checkstyle:ClassDataAbstractionCoupling"})
public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new
TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
@@ -120,6 +121,34 @@ public class CoordinatorRuntimeTest {
}
}
+ private static class ThrowingSerializer<T> implements Serializer<T> {
+ private final Serializer<T> serializer;
+ private boolean throwOnNextOperation;
+
+ public ThrowingSerializer(Serializer<T> serializer) {
+ this.serializer = serializer;
+ this.throwOnNextOperation = false;
+ }
+
+ public void throwOnNextOperation() {
+ throwOnNextOperation = true;
+ }
+
+ @Override
+ public byte[] serializeKey(T record) {
+ return serializer.serializeKey(record);
+ }
+
+ @Override
+ public byte[] serializeValue(T record) {
+ if (throwOnNextOperation) {
+ throwOnNextOperation = false;
+ throw new BufferOverflowException();
+ }
+ return serializer.serializeValue(record);
+ }
+ }
+
/**
* A CoordinatorEventProcessor that directly executes the operations. This
is
* useful in unit tests where execution in threads is not required.
@@ -270,6 +299,10 @@ public class CoordinatorRuntimeTest {
if (batch.sizeInBytes() > config(tp).maxMessageSize())
throw new RecordTooLargeException("Batch is larger than the
max message size");
+ // We don't want the coordinator to write empty batches.
+ if (batch.validBytes() <= 0)
+ throw new KafkaException("Coordinator tried to write an empty
batch");
+
if (writeCount.incrementAndGet() > maxWrites)
throw new KafkaException("Maximum number of writes reached");
@@ -4213,6 +4246,73 @@ public class CoordinatorRuntimeTest {
assertEquals(Collections.emptyList(), writer.entries(TP));
}
+ @Test
+ public void testEmptyBatch() throws Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ ThrowingSerializer<String> serializer = new
ThrowingSerializer<String>(new StringSerializer());
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Write #1, which fails.
+ serializer.throwOnNextOperation();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of("1"), "response1"));
+
+ // Write #1 should fail and leave an empty batch.
+ assertFutureThrows(write1, BufferOverflowException.class);
+ assertNotNull(ctx.currentBatch);
+
+ // Write #2, with no records.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(Collections.emptyList(),
"response2"));
+
+ // Write #2 should not be attached to the empty batch.
+ assertTrue(write2.isDone());
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+
+ // Complete transaction #1. It will flush the current empty batch.
+ // The coordinator must not try to write an empty batch, otherwise the
mock partition writer
+ // will throw an exception.
+ CompletableFuture<Void> complete1 =
runtime.scheduleTransactionCompletion(
+ "complete#1",
+ TP,
+ 100L,
+ (short) 50,
+ 10,
+ TransactionResult.COMMIT,
+ DEFAULT_WRITE_TIMEOUT
+ );
+
+ // Verify that the completion is not committed yet.
+ assertFalse(complete1.isDone());
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertNull(complete1.get(5, TimeUnit.SECONDS));
+ }
+
@Test
public void testRecordFlushTime() throws Exception {
MockTimer timer = new MockTimer();