This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3276ae029b KAFKA-19427 Allow the coordinator to grow its buffer 
dynamically (#20040)
e3276ae029b is described below

commit e3276ae029b6475421c67053096f8f86616b67f6
Author: Ming-Yen Chung <mingyen...@gmail.com>
AuthorDate: Wed Jul 16 22:06:33 2025 +0800

    KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040)
    
    * Coordinator starts with a smaller buffer, which can grow as needed.
    
    * In freeCurrentBatch, release the appropriate buffer:
      * The Coordinator recycles the expanded buffer
    (`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
    `MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
    isn't large enough.
    
      * There are two cases that buffer may exceeds `maxMessageSize`      1.
    If there's a single record whose size exceeds `maxMessageSize` (which,
    so far, is derived from `max.message.bytes`) and the write is in
    `non-atomic` mode, it's still possible for the buffer to grow beyond
    `maxMessageSize`. In this case, the Coordinator should revert to using a
    smaller buffer afterward.      2. Coordinator do not recycles the buffer
    that larger than `maxMessageSize`. If the user dynamically reduces
    `maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
    Coordinator should avoid recycling any buffer larger than
    `maxMessageSize` so that Coordinator can allocate the smaller buffer in
    the next round.
    
    * Add tests to verify the above scenarios.
    
    Reviewers: David Jacot <dja...@confluent.io>, Sean Quah
    <sq...@confluent.io>, Ken Huang <s7133...@gmail.com>, PoAn Yang
    <pay...@apache.org>, TaiJuWu <tjwu1...@gmail.com>, Jhen-Yung Hsu
    <jhenyung...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../common/runtime/CoordinatorRuntime.java         |  17 +-
 .../common/runtime/CoordinatorRuntimeTest.java     | 215 ++++++++++++++++++++-
 2 files changed, 222 insertions(+), 10 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 19567344f2d..8f72b0cac50 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -69,6 +69,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.lang.Math.min;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED;
 
 /**
@@ -758,8 +759,14 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             // Cancel the linger timeout.
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
-            // Release the buffer.
-            bufferSupplier.release(currentBatch.buffer);
+            // Release the buffer only if it is not larger than the 
maxBatchSize.
+            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+
+            if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+                bufferSupplier.release(currentBatch.builder.buffer());
+            } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+                bufferSupplier.release(currentBatch.buffer);
+            }
 
             currentBatch = null;
         }
@@ -859,7 +866,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 LogConfig logConfig = partitionWriter.config(tp);
                 int maxBatchSize = logConfig.maxMessageSize();
                 long prevLastWrittenOffset = coordinator.lastWrittenOffset();
-                ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+                ByteBuffer buffer = 
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
 
                 MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
                     buffer,
@@ -1909,9 +1916,9 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
     }
 
     /**
-     * 16KB. Used for initial buffer size for write operations.
+     * 512KB. Used for initial buffer size for write operations.
      */
-    static final int MIN_BUFFER_SIZE = 16384;
+    static final int INITIAL_BUFFER_SIZE = 512 * 1024;
 
     /**
      * The log prefix.
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index b82829e1d62..f0d38d749fa 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -44,6 +45,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentMatcher;
 
 import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,7 +68,7 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
-import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
+import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE;
 import static 
org.apache.kafka.coordinator.common.runtime.TestUtil.endTransactionMarker;
 import static org.apache.kafka.coordinator.common.runtime.TestUtil.records;
 import static 
org.apache.kafka.coordinator.common.runtime.TestUtil.transactionalRecords;
@@ -2919,11 +2921,11 @@ public class CoordinatorRuntimeTest {
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         int maxBatchSize = writer.config(TP).maxMessageSize();
-        assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
+        assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE);
 
-        // Generate enough records to create a batch that has 16KB < batchSize 
< maxBatchSize
+        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
         List<String> records = new ArrayList<>();
-        for (int i = 0; i < 3000; i++) {
+        for (int i = 0; i < 50000; i++) {
             records.add("record-" + i);
         }
 
@@ -2937,7 +2939,210 @@ public class CoordinatorRuntimeTest {
         assertFalse(write1.isCompletedExceptionally());
 
         int batchSize = writer.entries(TP).get(0).sizeInBytes();
-        assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
+        assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < 
maxBatchSize);
+    }
+
+    @Test
+    public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+        MockTimer timer = new MockTimer();
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return new LogConfig(Map.of(
+                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024) // 1MB
+                ));
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Generate a record larger than the maxBatchSize.
+        List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(largeRecords, "response1", null, 
true, false)
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        // Verify that the next buffer retrieved from the bufferSupplier is 
the initial small one, not the large buffer.
+        assertEquals(INITIAL_BUFFER_SIZE, 
ctx.bufferSupplier.get(1).capacity());
+    }
+
+    @Test
+    public void 
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+        MockTimer timer = new MockTimer();
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return new LogConfig(Map.of(
+                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024 * 1024) // 1GB
+                ));
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
+        List<String> records = new ArrayList<>();
+        for (int i = 0; i < 1000000; i++) {
+            records.add("record-" + i);
+        }
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+        int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+        assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= 
maxBatchSize);
+
+        // Verify that the next buffer retrieved from the bufferSupplier is 
the expanded buffer.
+        assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
+    }
+
+    @Test
+    public void 
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+        MockTimer timer = new MockTimer();
+        var mockWriter = new InMemoryPartitionWriter(false) {
+            private LogConfig config = new LogConfig(Map.of(
+                TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 
1024) // 1MB
+            ));
+
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return config;
+            }
+
+            public void updateConfig(LogConfig newConfig) {
+                this.config = newConfig;
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        List<String> records = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            records.add("record-" + i);
+        }
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+        int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+        assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= 
maxBatchSize);
+
+        ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
+        assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
+        // ctx.bufferSupplier.get(1); will clear cachedBuffer in 
bufferSupplier. Use release to put it back to bufferSupplier
+        ctx.bufferSupplier.release(cachedBuffer);
+
+        // Reduce max message size below initial buffer size.
+        mockWriter.updateConfig(new LogConfig(
+            Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(INITIAL_BUFFER_SIZE - 66))));
+        assertEquals(INITIAL_BUFFER_SIZE - 66, 
mockWriter.config(TP).maxMessageSize());
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response2")
+        );
+        assertFalse(write2.isCompletedExceptionally());
+
+        // Verify that there is no cached buffer since the cached buffer size 
is greater than new maxMessageSize.
+        assertEquals(1, ctx.bufferSupplier.get(1).capacity());
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response3")
+        );
+        assertFalse(write3.isCompletedExceptionally());
+
+        // Verify that the cached buffer size is equals to new maxMessageSize 
that less than INITIAL_BUFFER_SIZE.
+        assertEquals(mockWriter.config(TP).maxMessageSize(), 
ctx.bufferSupplier.get(1).capacity());
     }
 
     @Test

Reply via email to