This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 78fff9c550a KAFKA-19760: RecordTooLargeExceptions in group coordinator
when offsets.topic.compression.codec is used (4.0) (#20715)
78fff9c550a is described below
commit 78fff9c550a5827377de862b28fe1e7f25c21c6e
Author: Izzy Harker <[email protected]>
AuthorDate: Fri Oct 17 02:20:29 2025 -0500
KAFKA-19760: RecordTooLargeExceptions in group coordinator when
offsets.topic.compression.codec is used (4.0) (#20715)
This PR backports the change from
https://github.com/apache/kafka/pull/20653 to 4.0
The group coordinator has been having issues with unknown errors. The
theory is that this is caused by optimistic compression estimates which
cause unchecked batch overflows when trying to write.
This PR adds a check for uncompressed record size to flush batches more
eagerly and avoid overfilling partially-full batches. This should make
the group coordinator errors less frequent.
Also added tests to ensure this change does not impact desired behavior
for large compressible records.
Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 52 ++---
.../common/runtime/CoordinatorRuntimeTest.java | 209 ++++++++++++++++++++-
2 files changed, 229 insertions(+), 32 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index e1e80476cf8..e61983658fa 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
@@ -831,11 +831,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
/**
- * Flushes the current batch if it is transactional or if it has
passed the append linger time.
+ * Flushes the current batch if it is transactional, if it has passed
the append linger time, or if it is full.
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
- if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs ||
!currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
@@ -912,6 +912,24 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
}
+ /**
+ * Completes the given event once all pending writes are completed.
+ *
+ * @param event The event to complete once all pending
+ * writes are completed.
+ */
+ private void waitForPendingWrites(DeferredEvent event) {
+ if (currentBatch != null && currentBatch.builder.numRecords() > 0)
{
+ currentBatch.deferredEvents.add(event);
+ } else {
+ if (coordinator.lastCommittedOffset() <
coordinator.lastWrittenOffset()) {
+ deferredEventQueue.add(coordinator.lastWrittenOffset(),
event);
+ } else {
+ event.complete(null);
+ }
+ }
+ }
+
/**
* Appends records to the log and replay them to the state machine.
*
@@ -941,17 +959,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
if (records.isEmpty()) {
// If the records are empty, it was a read operation after
all. In this case,
- // the response can be returned directly iff there are no
pending write operations;
- // otherwise, the read needs to wait on the last write
operation to be completed.
- if (currentBatch != null && currentBatch.builder.numRecords()
> 0) {
- currentBatch.deferredEvents.add(event);
- } else {
- if (coordinator.lastCommittedOffset() <
coordinator.lastWrittenOffset()) {
-
deferredEventQueue.add(coordinator.lastWrittenOffset(), event);
- } else {
- event.complete(null);
- }
- }
+ // the response can be returned once any pending write
operations complete.
+ waitForPendingWrites(event);
} else {
// If the records are not empty, first, they are applied to
the state machine,
// second, they are appended to the opened batch.
@@ -986,22 +995,13 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
if (isAtomic) {
// Compute the estimated size of the records.
- int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ int estimatedSizeUpperBound =
AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
- compression.type(),
+ CompressionType.NONE,
recordsToAppend
);
- // Check if the current batch has enough space. We check
this before
- // replaying the records in order to avoid having to
revert back
- // changes if the records do not fit within a batch.
- if (estimatedSize >
currentBatch.builder.maxAllowedBytes()) {
- throw new RecordTooLargeException("Message batch size
is " + estimatedSize +
- " bytes in append to partition " + tp + " which
exceeds the maximum " +
- "configured size of " + currentBatch.maxBatchSize
+ ".");
- }
-
- if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ if
(!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
// Otherwise, we write the current batch, allocate a
new one and re-verify
// whether the records fit in it.
// If flushing fails, we don't catch the exception in
order to let
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 9198e207e4b..26e7e7b5f28 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -69,6 +69,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -544,15 +545,31 @@ public class CoordinatorRuntimeTest {
}
}
- private static MemoryRecords records(
+ public static MemoryRecords records(
long timestamp,
+ Compression compression,
String... records
) {
- return records(timestamp,
Arrays.stream(records).collect(Collectors.toList()));
+ return records(timestamp, compression,
Arrays.stream(records).toList());
}
- private static MemoryRecords records(
+ public static MemoryRecords records(
long timestamp,
+ String... records
+ ) {
+ return records(timestamp, Compression.NONE,
Arrays.stream(records).toList());
+ }
+
+ public static MemoryRecords records(
+ long timestamp,
+ List<String> records
+ ) {
+ return records(timestamp, Compression.NONE, records);
+ }
+
+ public static MemoryRecords records(
+ long timestamp,
+ Compression compression,
List<String> records
) {
if (records.isEmpty())
@@ -560,11 +577,11 @@ public class CoordinatorRuntimeTest {
List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
- ).collect(Collectors.toList());
+ ).toList();
int sizeEstimate = AbstractRecords.estimateSizeInBytes(
RecordVersion.current().value,
- CompressionType.NONE,
+ compression.type(),
simpleRecords
);
@@ -573,7 +590,7 @@ public class CoordinatorRuntimeTest {
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer,
RecordVersion.current().value,
- Compression.NONE,
+ compression,
TimestampType.CREATE_TIME,
0L,
timestamp,
@@ -4945,6 +4962,186 @@ public class CoordinatorRuntimeTest {
verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
}
+ @Test
+ public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws
Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ Compression compression = Compression.gzip().build();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withCompression(compression)
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create 2 records with a quarter of the max batch size each.
+ List<String> records = Stream.of('1', '2').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with the small records, batch will be about half full
+ long firstBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Verify the state - batch is not yet flushed
+ assertEquals(List.of(), writer.entries(TP));
+
+ // Create a record of highly compressible data
+ List<String> mediumRecord = List.of("a".repeat((int) (0.75 *
maxBatchSize)));
+
+ // Write #2 with the large record. This record is too large to go into
the previous batch
+ // uncompressed but fits in a new buffer, so we should flush the
previous batch and allocate
+ // a new one.
+ long secondBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(mediumRecord, "response2")
+ );
+
+ // Create a large record of highly compressible data
+ List<String> largeRecord = List.of("a".repeat(3 * maxBatchSize));
+
+ // Write #2 with the large record. This record is too large to go into
the previous batch
+ // uncompressed but will fit in the new buffer once compressed, so we
should flush the
+ // previous batch and successfully allocate a new batch for this
record. The new batch
+ // will also trigger an immediate flush.
+ long thirdBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(largeRecord, "response3")
+ );
+
+ // Verify the state.
+ assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, mediumRecord.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(3, largeRecord.get(0))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(firstBatchTimestamp, compression, records),
+ records(secondBatchTimestamp, compression, mediumRecord),
+ records(thirdBatchTimestamp, compression, largeRecord)
+ ), writer.entries(TP));
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertEquals(4L, ctx.coordinator.lastCommittedOffset());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testLargeUncompressibleRecordTriggersFlushAndFails() throws
Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ Compression compression = Compression.gzip().build();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withCompression(compression)
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create 2 records with a quarter of the max batch size each.
+ List<String> records = Stream.of('1', '2').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with the small records, batch will be about half full
+ long firstBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Verify the state - batch is not yet flushed
+ assertEquals(List.of(), writer.entries(TP));
+
+ // Create a large record of not very compressible data
+ char[] payload = new char[3 * maxBatchSize];
+ Random offset = new Random();
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (char) ('a' + ((char) offset.nextInt() % 26));
+ }
+ List<String> largeRecord = List.of(new String(payload));
+
+ // Write #2 with the large record. This record is too large to go into
the previous batch
+ // and is not compressible so it should be flushed. It is also too
large to fit in a new batch
+ // so the write should fail with RecordTooLargeException
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(largeRecord, "response2")
+ );
+
+ // Verify the state. The first batch was flushed and the largeRecord
+ // write failed.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(firstBatchTimestamp, compression, records)
+ ), writer.entries(TP));
+ }
+
@Test
public void testWriteEventCompletesOnlyOnce() throws Exception {
// Completes once via timeout, then again with HWM update.