This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e3276ae029b KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040) e3276ae029b is described below commit e3276ae029b6475421c67053096f8f86616b67f6 Author: Ming-Yen Chung <mingyen...@gmail.com> AuthorDate: Wed Jul 16 22:06:33 2025 +0800 KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040) * Coordinator starts with a smaller buffer, which can grow as needed. * In freeCurrentBatch, release the appropriate buffer: * The Coordinator recycles the expanded buffer (`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because `MemoryBuilder` may allocate a new `ByteBuffer` if the existing one isn't large enough. * There are two cases that buffer may exceeds `maxMessageSize` 1. If there's a single record whose size exceeds `maxMessageSize` (which, so far, is derived from `max.message.bytes`) and the write is in `non-atomic` mode, it's still possible for the buffer to grow beyond `maxMessageSize`. In this case, the Coordinator should revert to using a smaller buffer afterward. 2. Coordinator do not recycles the buffer that larger than `maxMessageSize`. If the user dynamically reduces `maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the Coordinator should avoid recycling any buffer larger than `maxMessageSize` so that Coordinator can allocate the smaller buffer in the next round. * Add tests to verify the above scenarios. Reviewers: David Jacot <dja...@confluent.io>, Sean Quah <sq...@confluent.io>, Ken Huang <s7133...@gmail.com>, PoAn Yang <pay...@apache.org>, TaiJuWu <tjwu1...@gmail.com>, Jhen-Yung Hsu <jhenyung...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../common/runtime/CoordinatorRuntime.java | 17 +- .../common/runtime/CoordinatorRuntimeTest.java | 215 ++++++++++++++++++++- 2 files changed, 222 insertions(+), 10 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 19567344f2d..8f72b0cac50 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -69,6 +69,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.lang.Math.min; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED; /** @@ -758,8 +759,14 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut // Cancel the linger timeout. currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); - // Release the buffer. - bufferSupplier.release(currentBatch.buffer); + // Release the buffer only if it is not larger than the maxBatchSize. + int maxBatchSize = partitionWriter.config(tp).maxMessageSize(); + + if (currentBatch.builder.buffer().capacity() <= maxBatchSize) { + bufferSupplier.release(currentBatch.builder.buffer()); + } else if (currentBatch.buffer.capacity() <= maxBatchSize) { + bufferSupplier.release(currentBatch.buffer); + } currentBatch = null; } @@ -859,7 +866,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut LogConfig logConfig = partitionWriter.config(tp); int maxBatchSize = logConfig.maxMessageSize(); long prevLastWrittenOffset = coordinator.lastWrittenOffset(); - ByteBuffer buffer = bufferSupplier.get(maxBatchSize); + ByteBuffer buffer = bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize)); MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, @@ -1909,9 +1916,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut } /** - * 16KB. Used for initial buffer size for write operations. + * 512KB. Used for initial buffer size for write operations. */ - static final int MIN_BUFFER_SIZE = 16384; + static final int INITIAL_BUFFER_SIZE = 512 * 1024; /** * The log prefix. diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index b82829e1d62..f0d38d749fa 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -44,6 +45,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentMatcher; import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -66,7 +68,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET; -import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE; +import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE; import static org.apache.kafka.coordinator.common.runtime.TestUtil.endTransactionMarker; import static org.apache.kafka.coordinator.common.runtime.TestUtil.records; import static org.apache.kafka.coordinator.common.runtime.TestUtil.transactionalRecords; @@ -2919,11 +2921,11 @@ public class CoordinatorRuntimeTest { assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); int maxBatchSize = writer.config(TP).maxMessageSize(); - assertTrue(maxBatchSize > MIN_BUFFER_SIZE); + assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE); - // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize + // Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize List<String> records = new ArrayList<>(); - for (int i = 0; i < 3000; i++) { + for (int i = 0; i < 50000; i++) { records.add("record-" + i); } @@ -2937,7 +2939,210 @@ public class CoordinatorRuntimeTest { assertFalse(write1.isCompletedExceptionally()); int batchSize = writer.entries(TP).get(0).sizeInBytes(); - assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); + assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < maxBatchSize); + } + + @Test + public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() { + MockTimer timer = new MockTimer(); + InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) { + @Override + public LogConfig config(TopicPartition tp) { + return new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Generate a record larger than the maxBatchSize. + List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024)); + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(largeRecords, "response1", null, true, false) + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + // Verify that the next buffer retrieved from the bufferSupplier is the initial small one, not the large buffer. + assertEquals(INITIAL_BUFFER_SIZE, ctx.bufferSupplier.get(1).capacity()); + } + + @Test + public void testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() { + MockTimer timer = new MockTimer(); + InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) { + @Override + public LogConfig config(TopicPartition tp) { + return new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024) // 1GB + )); + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize + List<String> records = new ArrayList<>(); + for (int i = 0; i < 1000000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + int batchSize = mockWriter.entries(TP).get(0).sizeInBytes(); + int maxBatchSize = mockWriter.config(TP).maxMessageSize(); + assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= maxBatchSize); + + // Verify that the next buffer retrieved from the bufferSupplier is the expanded buffer. + assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE); + } + + @Test + public void testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() { + MockTimer timer = new MockTimer(); + var mockWriter = new InMemoryPartitionWriter(false) { + private LogConfig config = new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + + @Override + public LogConfig config(TopicPartition tp) { + return config; + } + + public void updateConfig(LogConfig newConfig) { + this.config = newConfig; + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + List<String> records = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + int batchSize = mockWriter.entries(TP).get(0).sizeInBytes(); + int maxBatchSize = mockWriter.config(TP).maxMessageSize(); + assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= maxBatchSize); + + ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1); + assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity()); + // ctx.bufferSupplier.get(1); will clear cachedBuffer in bufferSupplier. Use release to put it back to bufferSupplier + ctx.bufferSupplier.release(cachedBuffer); + + // Reduce max message size below initial buffer size. + mockWriter.updateConfig(new LogConfig( + Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(INITIAL_BUFFER_SIZE - 66)))); + assertEquals(INITIAL_BUFFER_SIZE - 66, mockWriter.config(TP).maxMessageSize()); + + // Write #2. + CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response2") + ); + assertFalse(write2.isCompletedExceptionally()); + + // Verify that there is no cached buffer since the cached buffer size is greater than new maxMessageSize. + assertEquals(1, ctx.bufferSupplier.get(1).capacity()); + + // Write #3. + CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response3") + ); + assertFalse(write3.isCompletedExceptionally()); + + // Verify that the cached buffer size is equals to new maxMessageSize that less than INITIAL_BUFFER_SIZE. + assertEquals(mockWriter.config(TP).maxMessageSize(), ctx.bufferSupplier.get(1).capacity()); } @Test