This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 78fff9c550a KAFKA-19760: RecordTooLargeExceptions in group coordinator 
when offsets.topic.compression.codec is used (4.0) (#20715)
78fff9c550a is described below

commit 78fff9c550a5827377de862b28fe1e7f25c21c6e
Author: Izzy Harker <[email protected]>
AuthorDate: Fri Oct 17 02:20:29 2025 -0500

    KAFKA-19760: RecordTooLargeExceptions in group coordinator when 
offsets.topic.compression.codec is used (4.0) (#20715)
    
    This PR backports the change from
    https://github.com/apache/kafka/pull/20653 to 4.0
    
    The group coordinator has been having issues with unknown errors. The
    theory is that this is caused by optimistic compression estimates which
    cause unchecked batch overflows when trying to write.
    
    This PR adds a check for uncompressed record size to flush batches more
    eagerly and avoid overfilling partially-full batches. This should make
    the group coordinator errors less frequent.
    
    Also added tests to ensure this change does not impact desired behavior
    for large compressible records.
    
    Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  52 ++---
 .../common/runtime/CoordinatorRuntimeTest.java     | 209 ++++++++++++++++++++-
 2 files changed, 229 insertions(+), 32 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index e1e80476cf8..e61983658fa 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -831,11 +831,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         }
 
         /**
-         * Flushes the current batch if it is transactional or if it has 
passed the append linger time.
+         * Flushes the current batch if it is transactional, if it has passed 
the append linger time, or if it is full.
          */
         private void maybeFlushCurrentBatch(long currentTimeMs) {
             if (currentBatch != null) {
-                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || 
!currentBatch.builder.hasRoomFor(0)) {
                     flushCurrentBatch();
                 }
             }
@@ -912,6 +912,24 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             }
         }
 
+        /**
+         * Completes the given event once all pending writes are completed. 
+         *
+         * @param event             The event to complete once all pending 
+         *                          writes are completed.
+         */
+        private void waitForPendingWrites(DeferredEvent event) {
+            if (currentBatch != null && currentBatch.builder.numRecords() > 0) 
{
+                currentBatch.deferredEvents.add(event);
+            } else {
+                if (coordinator.lastCommittedOffset() < 
coordinator.lastWrittenOffset()) {
+                    deferredEventQueue.add(coordinator.lastWrittenOffset(), 
event);
+                } else {
+                    event.complete(null);
+                }
+            }
+        }
+
         /**
          * Appends records to the log and replay them to the state machine.
          *
@@ -941,17 +959,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
 
             if (records.isEmpty()) {
                 // If the records are empty, it was a read operation after 
all. In this case,
-                // the response can be returned directly iff there are no 
pending write operations;
-                // otherwise, the read needs to wait on the last write 
operation to be completed.
-                if (currentBatch != null && currentBatch.builder.numRecords() 
> 0) {
-                    currentBatch.deferredEvents.add(event);
-                } else {
-                    if (coordinator.lastCommittedOffset() < 
coordinator.lastWrittenOffset()) {
-                        
deferredEventQueue.add(coordinator.lastWrittenOffset(), event);
-                    } else {
-                        event.complete(null);
-                    }
-                }
+                // the response can be returned once any pending write 
operations complete.
+                waitForPendingWrites(event);
             } else {
                 // If the records are not empty, first, they are applied to 
the state machine,
                 // second, they are appended to the opened batch.
@@ -986,22 +995,13 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
 
                 if (isAtomic) {
                     // Compute the estimated size of the records.
-                    int estimatedSize = AbstractRecords.estimateSizeInBytes(
+                    int estimatedSizeUpperBound = 
AbstractRecords.estimateSizeInBytes(
                         currentBatch.builder.magic(),
-                        compression.type(),
+                        CompressionType.NONE,
                         recordsToAppend
                     );
 
-                    // Check if the current batch has enough space. We check 
this before
-                    // replaying the records in order to avoid having to 
revert back
-                    // changes if the records do not fit within a batch.
-                    if (estimatedSize > 
currentBatch.builder.maxAllowedBytes()) {
-                        throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
-                            " bytes in append to partition " + tp + " which 
exceeds the maximum " +
-                            "configured size of " + currentBatch.maxBatchSize 
+ ".");
-                    }
-
-                    if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                    if 
(!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
                         // Otherwise, we write the current batch, allocate a 
new one and re-verify
                         // whether the records fit in it.
                         // If flushing fails, we don't catch the exception in 
order to let
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 9198e207e4b..26e7e7b5f28 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -69,6 +69,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalInt;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -544,15 +545,31 @@ public class CoordinatorRuntimeTest {
         }
     }
 
-    private static MemoryRecords records(
+    public static MemoryRecords records(
         long timestamp,
+        Compression compression,
         String... records
     ) {
-        return records(timestamp, 
Arrays.stream(records).collect(Collectors.toList()));
+        return records(timestamp, compression, 
Arrays.stream(records).toList());
     }
 
-    private static MemoryRecords records(
+    public static MemoryRecords records(
         long timestamp,
+        String... records
+    ) {
+        return records(timestamp, Compression.NONE, 
Arrays.stream(records).toList());
+    }
+
+    public static MemoryRecords records(
+        long timestamp,
+        List<String> records
+    ) {
+        return records(timestamp, Compression.NONE, records);
+    }
+
+    public static MemoryRecords records(
+        long timestamp,
+        Compression compression,
         List<String> records
     ) {
         if (records.isEmpty())
@@ -560,11 +577,11 @@ public class CoordinatorRuntimeTest {
 
         List<SimpleRecord> simpleRecords = records.stream().map(record ->
             new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
-        ).collect(Collectors.toList());
+        ).toList();
 
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(
             RecordVersion.current().value,
-            CompressionType.NONE,
+            compression.type(),
             simpleRecords
         );
 
@@ -573,7 +590,7 @@ public class CoordinatorRuntimeTest {
         MemoryRecordsBuilder builder = MemoryRecords.builder(
             buffer,
             RecordVersion.current().value,
-            Compression.NONE,
+            compression,
             TimestampType.CREATE_TIME,
             0L,
             timestamp,
@@ -4945,6 +4962,186 @@ public class CoordinatorRuntimeTest {
         verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
     }
 
+    @Test
+    public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws 
Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        Compression compression = Compression.gzip().build();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withCompression(compression)
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create 2 records with a quarter of the max batch size each. 
+        List<String> records = Stream.of('1', '2').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with the small records, batch will be about half full
+        long firstBatchTimestamp = timer.time().milliseconds();
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state - batch is not yet flushed
+        assertEquals(List.of(), writer.entries(TP));
+
+        // Create a record of highly compressible data
+        List<String> mediumRecord = List.of("a".repeat((int) (0.75 * 
maxBatchSize)));
+
+        // Write #2 with the large record. This record is too large to go into 
the previous batch
+        // uncompressed but fits in a new buffer, so we should flush the 
previous batch and allocate
+        // a new one. 
+        long secondBatchTimestamp = timer.time().milliseconds();
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(mediumRecord, "response2")
+        );
+
+        // Create a large record of highly compressible data
+        List<String> largeRecord = List.of("a".repeat(3 * maxBatchSize));
+
+        // Write #2 with the large record. This record is too large to go into 
the previous batch
+        // uncompressed but will fit in the new buffer once compressed, so we 
should flush the
+        // previous batch and successfully allocate a new batch for this 
record. The new batch
+        // will also trigger an immediate flush.
+        long thirdBatchTimestamp = timer.time().milliseconds();
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(largeRecord, "response3")
+        );
+
+        // Verify the state.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, mediumRecord.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(3, largeRecord.get(0))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            records(firstBatchTimestamp, compression, records),
+            records(secondBatchTimestamp, compression, mediumRecord),
+            records(thirdBatchTimestamp, compression, largeRecord)
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertEquals(4L, ctx.coordinator.lastCommittedOffset());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testLargeUncompressibleRecordTriggersFlushAndFails() throws 
Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        Compression compression = Compression.gzip().build();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withCompression(compression)
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create 2 records with a quarter of the max batch size each. 
+        List<String> records = Stream.of('1', '2').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with the small records, batch will be about half full
+        long firstBatchTimestamp = timer.time().milliseconds();
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state - batch is not yet flushed
+        assertEquals(List.of(), writer.entries(TP));
+
+        // Create a large record of not very compressible data
+        char[] payload = new char[3 * maxBatchSize];
+        Random offset = new Random();
+        for (int i = 0; i < payload.length; i++) {
+            payload[i] = (char) ('a' + ((char) offset.nextInt() % 26));
+        }
+        List<String> largeRecord = List.of(new String(payload));
+
+        // Write #2 with the large record. This record is too large to go into 
the previous batch
+        // and is not compressible so it should be flushed. It is also too 
large to fit in a new batch
+        // so the write should fail with RecordTooLargeException
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(largeRecord, "response2")
+        );
+
+        // Verify the state. The first batch was flushed and the largeRecord
+        // write failed.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            records(firstBatchTimestamp, compression, records)
+        ), writer.entries(TP));
+    }
+
     @Test
     public void testWriteEventCompletesOnlyOnce() throws Exception {
         // Completes once via timeout, then again with HWM update.

Reply via email to