This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new c432167fe21 KAFKA-19760: RecordTooLargeExceptions in group coordinator
when offsets.topic.compression.codec is used (#20653)
c432167fe21 is described below
commit c432167fe218c30ceb3c137d73e80225effd33c7
Author: Izzy Harker <[email protected]>
AuthorDate: Thu Oct 16 04:10:47 2025 -0500
KAFKA-19760: RecordTooLargeExceptions in group coordinator when
offsets.topic.compression.codec is used (#20653)
The group coordinator has been having issues with unknown errors. The
theory is that this is caused by optimistic compression estimates which
cause unchecked batch overflows when trying to write.
This PR adds a check for uncompressed record size to flush batches more
eagerly and avoid overfilling partially-full batches. This should make
the group coordinator errors less frequent.
Also added tests to ensure this change does not impact desired behavior
for large compressible records.
Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 66 +++---
.../common/runtime/CoordinatorRuntimeTest.java | 264 +++++++++++++++++++++
.../kafka/coordinator/common/runtime/TestUtil.java | 23 +-
3 files changed, 317 insertions(+), 36 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 8f72b0cac50..cb0c7bbe05f 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
@@ -831,11 +831,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
/**
- * Flushes the current batch if it is transactional or if it has
passed the append linger time.
+ * Flushes the current batch if it is transactional, if it has passed
the append linger time, or if it is full.
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
- if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs ||
!currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
@@ -913,6 +913,24 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
}
+ /**
+ * Completes the given event once all pending writes are completed.
+ *
+ * @param event The event to complete once all pending
+ * writes are completed.
+ */
+ private void waitForPendingWrites(DeferredEvent event) {
+ if (currentBatch != null && currentBatch.builder.numRecords() > 0)
{
+ currentBatch.deferredEvents.add(event);
+ } else {
+ if (coordinator.lastCommittedOffset() <
coordinator.lastWrittenOffset()) {
+ deferredEventQueue.add(coordinator.lastWrittenOffset(),
DeferredEventCollection.of(log, event));
+ } else {
+ event.complete(null);
+ }
+ }
+ }
+
/**
* Appends records to the log and replay them to the state machine.
*
@@ -942,17 +960,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
if (records.isEmpty()) {
// If the records are empty, it was a read operation after
all. In this case,
- // the response can be returned directly iff there are no
pending write operations;
- // otherwise, the read needs to wait on the last write
operation to be completed.
- if (currentBatch != null && currentBatch.builder.numRecords()
> 0) {
- currentBatch.deferredEvents.add(event);
- } else {
- if (coordinator.lastCommittedOffset() <
coordinator.lastWrittenOffset()) {
-
deferredEventQueue.add(coordinator.lastWrittenOffset(),
DeferredEventCollection.of(log, event));
- } else {
- event.complete(null);
- }
- }
+ // the response can be returned once any pending write
operations complete.
+ waitForPendingWrites(event);
} else {
// If the records are not empty, first, they are applied to
the state machine,
// second, they are appended to the opened batch.
@@ -986,27 +995,18 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
if (isAtomic) {
- // Compute the estimated size of the records.
- int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ // Compute the size of the records.
+ int estimatedSizeUpperBound =
AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
- compression.type(),
+ CompressionType.NONE,
recordsToAppend
);
- // Check if the current batch has enough space. We check
this before
- // replaying the records in order to avoid having to
revert back
- // changes if the records do not fit within a batch.
- if (estimatedSize >
currentBatch.builder.maxAllowedBytes()) {
- throw new RecordTooLargeException("Message batch size
is " + estimatedSize +
- " bytes in append to partition " + tp + " which
exceeds the maximum " +
- "configured size of " + currentBatch.maxBatchSize
+ ".");
- }
-
- if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
- // Otherwise, we write the current batch, allocate a
new one and re-verify
- // whether the records fit in it.
- // If flushing fails, we don't catch the exception in
order to let
- // the caller fail the current operation.
+ if
(!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
+ // Start a new batch when the total uncompressed data
size would exceed
+ // the max batch size. We still allow atomic writes
with an uncompressed size
+ // larger than the max batch size as long as they
compress down to under the max
+ // batch size. These large writes go into a batch by
themselves.
flushCurrentBatch();
maybeAllocateNewBatch(
producerId,
@@ -1077,8 +1077,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
// Add the event to the list of pending events associated with
the batch.
currentBatch.deferredEvents.add(event);
- // Write the current batch if it is transactional or if the
linger timeout
- // has expired.
+ // Write the current batch if it is transactional, if the
linger timeout
+ // has expired, or if it is full.
// If flushing fails, we don't catch the exception in order to
let
// the caller fail the current operation.
maybeFlushCurrentBatch(currentTimeMs);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index f0d38d749fa..96cccd795a8 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
@@ -52,6 +53,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -4293,6 +4295,268 @@ public class CoordinatorRuntimeTest {
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testCompressibleRecordTriggersFlushAndSucceeds() throws
Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ Compression compression = Compression.gzip().build();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withCompression(compression)
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create 2 records with a quarter of the max batch size each.
+ List<String> records = Stream.of('1', '2').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with the small records, batch will be about half full
+ long firstBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Verify the state - batch is not yet flushed
+ assertEquals(List.of(), writer.entries(TP));
+
+ // Create a record of highly compressible data
+ List<String> largeRecord = List.of("a".repeat((int) (0.75 *
maxBatchSize)));
+
+ // Write #2 with the large record. This record is too large to go into
the previous batch
+ // uncompressed but fits in a new buffer, so we should flush the
previous batch and allocate
+ // a new one.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(largeRecord, "response2")
+ );
+
+ // Verify the state. The first batch has flushed but the second is
pending.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, largeRecord.get(0))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(firstBatchTimestamp, compression, records)
+ ), writer.entries(TP));
+
+ // Advance past the linger time
+ timer.advanceClock(11);
+
+ // Commit and verify that the second batch is completed
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws
Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ Compression compression = Compression.gzip().build();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withCompression(compression)
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create 2 records with a quarter of the max batch size each.
+ List<String> records = Stream.of('1', '2').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with the small records, batch will be about half full
+ long firstBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Verify the state - batch is not yet flushed
+ assertEquals(List.of(), writer.entries(TP));
+
+ // Create a large record of highly compressible data
+ List<String> largeRecord = List.of("a".repeat(3 * maxBatchSize));
+
+ // Write #2 with the large record. This record is too large to go into
the previous batch
+ // uncompressed but will fit in the new buffer once compressed, so we
should flush the
+ // previous batch and successfully allocate a new batch for this
record. The new batch
+ // will also trigger an immediate flush.
+ long secondBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(largeRecord, "response2")
+ );
+
+ // Verify the state.
+ assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, largeRecord.get(0))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(firstBatchTimestamp, compression, records),
+ records(secondBatchTimestamp, compression, largeRecord)
+ ), writer.entries(TP));
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testLargeUncompressibleRecordTriggersFlushAndFails() throws
Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ Compression compression = Compression.gzip().build();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withCompression(compression)
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create 2 records with a quarter of the max batch size each.
+ List<String> records = Stream.of('1', '2').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with the small records, batch will be about half full
+ long firstBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Verify the state - batch is not yet flushed
+ assertEquals(List.of(), writer.entries(TP));
+
+ // Create a large record of not very compressible data
+ char[] payload = new char[3 * maxBatchSize];
+ Random offset = new Random();
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (char) ('a' + ((char) offset.nextInt() % 26));
+ }
+ List<String> largeRecord = List.of(new String(payload));
+
+ // Write #2 with the large record. This record is too large to go into
the previous batch
+ // and is not compressible so it should be flushed. It is also too
large to fit in a new batch
+ // so the write should fail with RecordTooLargeException
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(largeRecord, "response2")
+ );
+
+ // Check that write2 fails with RecordTooLargeException
+ assertFutureThrows(RecordTooLargeException.class, write2);
+
+ // Verify the state. The first batch was flushed and the largeRecord
+ // write failed.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(firstBatchTimestamp, compression, records)
+ ), writer.entries(TP));
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertEquals(2L, ctx.coordinator.lastCommittedOffset());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ }
+
@Test
public void testRecordEventPurgatoryTime() throws Exception {
Duration writeTimeout = Duration.ofMillis(1000);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
index c3eda174671..8e39f9db8f8 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
@@ -44,14 +44,31 @@ import java.util.List;
public class TestUtil {
public static MemoryRecords records(
long timestamp,
+ Compression compression,
String... records
) {
- return records(timestamp, Arrays.stream(records).toList());
+ return records(timestamp, compression,
Arrays.stream(records).toList());
+ }
+
+
+ public static MemoryRecords records(
+ long timestamp,
+ String... records
+ ) {
+ return records(timestamp, Compression.NONE,
Arrays.stream(records).toList());
}
public static MemoryRecords records(
long timestamp,
List<String> records
+ ) {
+ return records(timestamp, Compression.NONE, records);
+ }
+
+ public static MemoryRecords records(
+ long timestamp,
+ Compression compression,
+ List<String> records
) {
if (records.isEmpty())
return MemoryRecords.EMPTY;
@@ -62,7 +79,7 @@ public class TestUtil {
int sizeEstimate = AbstractRecords.estimateSizeInBytes(
RecordVersion.current().value,
- CompressionType.NONE,
+ compression.type(),
simpleRecords
);
@@ -71,7 +88,7 @@ public class TestUtil {
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer,
RecordVersion.current().value,
- Compression.NONE,
+ compression,
TimestampType.CREATE_TIME,
0L,
timestamp,