This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb9280eceea KAFKA-19519 Introduce 
group/share.coordinator.cached.buffer.max.bytes config (#20847)
cb9280eceea is described below

commit cb9280eceea37be40fbf46b7b9fab17dda0e1faf
Author: Lan Ding <[email protected]>
AuthorDate: Thu Dec 11 22:13:02 2025 +0800

    KAFKA-19519 Introduce group/share.coordinator.cached.buffer.max.bytes 
config (#20847)
    
    **Changes**
    
    1. New Dynamic Configurations
    - `group.coordinator.cached.buffer.max.bytes`: Largest cached buffer
    size
    allowed by GroupCoordinator
    - `share.coordinator.cached.buffer.max.bytes`: Largest cached buffer
    size
    allowed by ShareCoordinator
    
    Both configurations default to `1 * 1024 * 1024 + Records.LOG_OVERHEAD`
    with minimum value of `512 * 1024`.
    
    2. Extended CoordinatorRuntime Builder Interface
    
    Added withCachedBufferMaxBytesSupplier(Supplier<Integer>
    cachedBufferMaxBytesSupplier) method  to allow different coordinator
    implementations to supply their buffer  size configuration.
    
    3. New Monitoring Metrics
    
    - `batch-buffer-cache-size-bytes`: Current total size in bytes of the
    append
    buffers being held in the coordinator's cache
    - `batch-buffer-cache-discard-count`: Count of oversized append buffers
    that
    were discarded instead of being cached upon release
    
    Reviewers: Sushant Mahajan <[email protected]>, David Jacot
     <[email protected]>, Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  58 +++++--
 .../common/runtime/CoordinatorRuntimeMetrics.java  |  12 ++
 .../runtime/CoordinatorRuntimeMetricsImpl.java     |  46 +++++-
 .../runtime/CoordinatorRuntimeMetricsImplTest.java |  30 +++-
 .../common/runtime/CoordinatorRuntimeTest.java     | 174 +++++++++++++--------
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   6 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |  19 +++
 .../kafka/server/DynamicConfigChangeTest.scala     |  51 +++++-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 +
 docs/ops.html                                      |  20 +++
 docs/upgrade.html                                  |  13 ++
 .../coordinator/group/GroupCoordinatorConfig.java  |  27 ++++
 .../coordinator/group/GroupCoordinatorService.java |   1 +
 .../group/GroupCoordinatorConfigTest.java          |   3 +
 .../coordinator/share/ShareCoordinatorConfig.java  |  29 +++-
 .../coordinator/share/ShareCoordinatorService.java |   1 +
 .../share/ShareCoordinatorTestConfig.java          |   1 +
 17 files changed, 406 insertions(+), 86 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 58252561c39..92afee2cc7e 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -42,7 +42,6 @@ import org.apache.kafka.deferred.DeferredEventQueue;
 import org.apache.kafka.server.common.TransactionVersion;
 import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.server.util.timer.TimerTask;
-import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
@@ -66,6 +65,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.lang.Math.min;
@@ -120,6 +120,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         private Compression compression;
         private OptionalInt appendLingerMs;
         private ExecutorService executorService;
+        private Supplier<Integer> cachedBufferMaxBytesSupplier;
 
         public Builder<S, U> withLogPrefix(String logPrefix) {
             this.logPrefix = logPrefix;
@@ -196,6 +197,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             return this;
         }
 
+        public Builder<S, U> 
withCachedBufferMaxBytesSupplier(Supplier<Integer> 
cachedBufferMaxBytesSupplier) {
+            this.cachedBufferMaxBytesSupplier = cachedBufferMaxBytesSupplier;
+            return this;
+        }
+
         @SuppressWarnings("checkstyle:CyclomaticComplexity")
         public CoordinatorRuntime<S, U> build() {
             if (logPrefix == null)
@@ -228,6 +234,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 throw new IllegalArgumentException("AppendLinger must be empty 
or >= 0");
             if (executorService == null)
                 throw new IllegalArgumentException("ExecutorService must be 
set.");
+            if (cachedBufferMaxBytesSupplier == null)
+                throw new IllegalArgumentException("Cached buffer max bytes 
supplier must be set.");
 
             return new CoordinatorRuntime<>(
                 logPrefix,
@@ -244,7 +252,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 serializer,
                 compression,
                 appendLingerMs,
-                executorService
+                executorService,
+                cachedBufferMaxBytesSupplier
             );
         }
     }
@@ -479,11 +488,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         final long appendTimeMs;
 
-        /**
-         * The max batch size.
-         */
-        final int maxBatchSize;
-
         /**
          * The verification guard associated to the batch if it is
          * transactional.
@@ -521,7 +525,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             Logger log,
             long baseOffset,
             long appendTimeMs,
-            int maxBatchSize,
             VerificationGuard verificationGuard,
             ByteBuffer buffer,
             MemoryRecordsBuilder builder,
@@ -530,7 +533,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             this.baseOffset = baseOffset;
             this.nextOffset = baseOffset;
             this.appendTimeMs = appendTimeMs;
-            this.maxBatchSize = maxBatchSize;
             this.verificationGuard = verificationGuard;
             this.buffer = buffer;
             this.builder = builder;
@@ -603,6 +605,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         BufferSupplier bufferSupplier;
 
+        /**
+         * The cached buffer size.
+         */
+        AtomicLong cachedBufferSize;
+
         /**
          * The current (or pending) batch.
          */
@@ -641,6 +648,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 defaultWriteTimeout
             );
             this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
+            this.cachedBufferSize = new AtomicLong(0);
         }
 
         /**
@@ -772,13 +780,20 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             // Cancel the linger timeout.
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
-            // Release the buffer only if it is not larger than the 
maxBatchSize.
-            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+            // Release the buffer only if it is not larger than the 
cachedBufferMaxBytes.
+            int cachedBufferMaxBytes = cachedBufferMaxBytesSupplier.get();
 
-            if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+            if (currentBatch.builder.buffer().capacity() <= 
cachedBufferMaxBytes) {
                 bufferSupplier.release(currentBatch.builder.buffer());
-            } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+                cachedBufferSize.set(currentBatch.builder.buffer().capacity());
+            } else if (currentBatch.buffer.capacity() <= cachedBufferMaxBytes) 
{
                 bufferSupplier.release(currentBatch.buffer);
+                cachedBufferSize.set(currentBatch.buffer.capacity());
+                // If the builder expands the buffer beyond the 
cachedBufferMaxBytes, that should also increase the discard counter.
+                runtimeMetrics.recordBufferCacheDiscarded();
+            } else {
+                runtimeMetrics.recordBufferCacheDiscarded();
+                cachedBufferSize.set(0L);
             }
 
             currentBatch = null;
@@ -901,8 +916,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             long currentTimeMs
         ) {
             if (currentBatch == null) {
-                LogConfig logConfig = partitionWriter.config(tp);
-                int maxBatchSize = logConfig.maxMessageSize();
+                int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
                 long prevLastWrittenOffset = coordinator.lastWrittenOffset();
                 ByteBuffer buffer = 
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
 
@@ -953,7 +967,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                     log,
                     prevLastWrittenOffset,
                     currentTimeMs,
-                    maxBatchSize,
                     verificationGuard,
                     buffer,
                     builder,
@@ -2064,6 +2077,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      */
     private final ExecutorService executorService;
 
+    /**
+     * The maximum buffer size that the coordinator can cache.
+     */
+    private final Supplier<Integer> cachedBufferMaxBytesSupplier;
+
     /**
      * Atomic boolean indicating whether the runtime is running.
      */
@@ -2092,6 +2110,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      * @param compression                       The compression codec.
      * @param appendLingerMs                    The append linger time in ms.
      * @param executorService                   The executor service.
+     * @param cachedBufferMaxBytesSupplier      The cached buffer max bytes 
supplier.
      */
     @SuppressWarnings("checkstyle:ParameterNumber")
     private CoordinatorRuntime(
@@ -2109,7 +2128,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         Serializer<U> serializer,
         Compression compression,
         OptionalInt appendLingerMs,
-        ExecutorService executorService
+        ExecutorService executorService,
+        Supplier<Integer> cachedBufferMaxBytesSupplier
     ) {
         this.logPrefix = logPrefix;
         this.log = logContext.logger(CoordinatorRuntime.class);
@@ -2127,6 +2147,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         this.compression = compression;
         this.appendLingerMs = appendLingerMs;
         this.executorService = executorService;
+        this.cachedBufferMaxBytesSupplier = cachedBufferMaxBytesSupplier;
+        this.runtimeMetrics.registerBufferCacheSizeGauge(
+            () -> coordinators.values().stream().mapToLong(c -> 
c.cachedBufferSize.get()).sum()
+        );
     }
 
     /**
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
index 5b9b9254230..c537a2ea413 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
@@ -86,4 +86,16 @@ public interface CoordinatorRuntimeMetrics extends 
AutoCloseable {
      * @param sizeSupplier The size supplier.
      */
     void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier);
+
+    /**
+     * Register the cached buffer size gauge.
+     *
+     * @param bufferCacheSizeSupplier The buffer cache size supplier.
+     */
+    void registerBufferCacheSizeGauge(Supplier<Long> bufferCacheSizeSupplier);
+
+    /**
+     * Called when a buffer is discarded upon release instead of being cached.
+     */
+    void recordBufferCacheDiscarded();
 }
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index 8382090639c..93bc9848201 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -69,6 +69,16 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
      */
     public static final String BATCH_FLUSH_TIME_METRIC_NAME = 
"batch-flush-time-ms";
 
+    /**
+     * The buffer cache size metric name.
+     */
+    public static final String BATCH_BUFFER_CACHE_SIZE_METRIC_NAME = 
"batch-buffer-cache-size-bytes";
+
+    /**
+     * The buffer cache discard count metric name.
+     */
+    public static final String BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME = 
"batch-buffer-cache-discard-count";
+
     /**
      * Metric to count the number of partitions in Loading state.
      */
@@ -92,6 +102,17 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
      */
     private final MetricName eventQueueSize;
 
+    /**
+     * Metric to count the size of the cached buffers.
+     */
+    private final MetricName bufferCacheSize;
+
+    /**
+     * Metric to count the number of over-sized append buffers that were 
discarded.
+     */
+    private final MetricName bufferCacheDiscardCount;
+    private final AtomicLong bufferCacheDiscardCounter = new AtomicLong(0);
+
     /**
      * The Kafka metrics registry.
      */
@@ -156,9 +177,20 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
 
         this.eventQueueSize = kafkaMetricName("event-queue-size", "The event 
accumulator queue size.");
 
+        this.bufferCacheSize = kafkaMetricName(
+            BATCH_BUFFER_CACHE_SIZE_METRIC_NAME,
+            "The current total size in bytes of the append buffers being held 
in the coordinator's cache."
+        );
+
+        this.bufferCacheDiscardCount = kafkaMetricName(
+            BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME,
+            "The count of over-sized append buffers that were discarded 
instead of being cached upon release."
+        );
+        
         metrics.addMetric(numPartitionsLoading, (Gauge<Long>) (config, now) -> 
numPartitionsLoadingCounter.get());
         metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> 
numPartitionsActiveCounter.get());
         metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> 
numPartitionsFailedCounter.get());
+        metrics.addMetric(bufferCacheDiscardCount, (Gauge<Long>) (config, now) 
-> bufferCacheDiscardCounter.get());
 
         this.partitionLoadSensor = metrics.sensor(this.metricsGroup + 
"-PartitionLoadTime");
         this.partitionLoadSensor.add(
@@ -252,7 +284,9 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
             numPartitionsLoading,
             numPartitionsActive,
             numPartitionsFailed,
-            eventQueueSize
+            eventQueueSize,
+            bufferCacheSize,
+            bufferCacheDiscardCount
         ).forEach(metrics::removeMetric);
 
         metrics.removeSensor(partitionLoadSensor.name());
@@ -340,4 +374,14 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
     public void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier) {
         metrics.addMetric(eventQueueSize, (Gauge<Long>) (config, now) -> 
(long) sizeSupplier.get());
     }
+
+    @Override
+    public void registerBufferCacheSizeGauge(Supplier<Long> 
bufferCacheSizeSupplier) {
+        metrics.addMetric(bufferCacheSize, (Gauge<Long>) (config, now) -> 
bufferCacheSizeSupplier.get());
+    }
+
+    @Override
+    public void recordBufferCacheDiscarded() {
+        bufferCacheDiscardCounter.incrementAndGet();
+    }
 }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index b243f12466b..82601be18ba 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -33,6 +33,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME;
+import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_BUFFER_CACHE_SIZE_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
@@ -88,7 +90,9 @@ public class CoordinatorRuntimeMetricsImplTest {
             kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
             kafkaMetricName(metrics, "batch-flush-time-ms-p99"),
             kafkaMetricName(metrics, "batch-flush-time-ms-p999"),
-            kafkaMetricName(metrics, "batch-flush-rate")
+            kafkaMetricName(metrics, "batch-flush-rate"),
+            kafkaMetricName(metrics, BATCH_BUFFER_CACHE_SIZE_METRIC_NAME),
+            kafkaMetricName(metrics, 
BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME)
         );
     }
 
@@ -100,6 +104,7 @@ public class CoordinatorRuntimeMetricsImplTest {
 
         try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
             runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+            runtimeMetrics.registerBufferCacheSizeGauge(() -> 0L);
             expectedMetrics.forEach(metricName -> 
assertTrue(metrics.metrics().containsKey(metricName)));
         }
 
@@ -118,6 +123,7 @@ public class CoordinatorRuntimeMetricsImplTest {
         Set<MetricName> metricNames;
         try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
             runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+            runtimeMetrics.registerBufferCacheSizeGauge(() -> 0L);
 
             ArgumentCaptor<String> sensorCaptor = 
ArgumentCaptor.forClass(String.class);
             verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
@@ -138,6 +144,7 @@ public class CoordinatorRuntimeMetricsImplTest {
         Set<MetricName> otherMetricNames;
         try (CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
             otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
+            otherRuntimeMetrics.registerBufferCacheSizeGauge(() -> 0L);
 
             ArgumentCaptor<String> sensorCaptor = 
ArgumentCaptor.forClass(String.class);
             verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
@@ -185,7 +192,6 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
-
     @Test
     public void testPartitionLoadSensorMetrics() {
         Time time = new MockTime();
@@ -233,6 +239,26 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
+    @Test
+    public void testBatchBufferCacheSize() {
+        Metrics metrics = new Metrics();
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
+            runtimeMetrics.registerBufferCacheSizeGauge(() -> 5L);
+            assertMetricGauge(metrics, kafkaMetricName(metrics, 
BATCH_BUFFER_CACHE_SIZE_METRIC_NAME), 5);
+        }
+    }
+
+    @Test
+    public void testBatchBufferCacheDiscardCount() {
+        Metrics metrics = new Metrics();
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
+            runtimeMetrics.recordBufferCacheDiscarded();
+            assertMetricGauge(metrics, kafkaMetricName(metrics, 
BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME), 1);
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(strings = {
         EVENT_QUEUE_TIME_METRIC_NAME,
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 0e78a8e1c3f..00609c94b4f 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
@@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -63,6 +63,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -102,6 +103,7 @@ public class CoordinatorRuntimeTest {
     private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
 
     private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
+    private static final int CACHED_BUFFER_MAX_BYTES = 1024 * 1024 + 
Records.LOG_OVERHEAD;
 
     @Test
     public void testScheduleLoading() {
@@ -125,6 +127,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -197,6 +200,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -249,6 +253,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -305,6 +310,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -378,6 +384,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -434,6 +441,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -490,6 +498,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -534,6 +543,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -584,6 +594,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         doThrow(new KafkaException("error")).when(coordinator).onUnloaded();
@@ -640,6 +651,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -728,6 +740,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -850,6 +863,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Scheduling a write fails with a NotCoordinatorException because the 
coordinator
@@ -875,6 +889,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -904,6 +919,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -965,6 +981,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1018,6 +1035,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1056,6 +1074,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         TopicPartition coordinator0 = new TopicPartition("__consumer_offsets", 
0);
@@ -1128,6 +1147,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -1224,6 +1244,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -1287,6 +1308,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -1415,6 +1437,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1477,6 +1500,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1545,6 +1569,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1646,6 +1671,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1732,6 +1758,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1791,6 +1818,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule a read. It fails because the coordinator does not exist.
@@ -1817,6 +1845,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1864,6 +1893,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         TopicPartition coordinator0 = new TopicPartition("__consumer_offsets", 
0);
@@ -1918,6 +1948,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(executorService)
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -1991,6 +2022,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
@@ -2056,6 +2088,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2099,6 +2132,7 @@ public class CoordinatorRuntimeTest {
     public void testRescheduleTimer() throws InterruptedException {
         MockTimer timer = new MockTimer();
         ManualEventProcessor processor = new ManualEventProcessor();
+
         CoordinatorRuntime<MockCoordinatorShard, String> runtime =
             new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
                 .withTime(timer.time())
@@ -2112,6 +2146,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2188,6 +2223,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2261,6 +2297,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2322,6 +2359,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2397,6 +2435,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2441,6 +2480,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator.
@@ -2493,6 +2533,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         runtime.scheduleLoadOperation(TP, 10);
@@ -2551,6 +2592,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2631,6 +2673,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2689,6 +2732,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2748,6 +2792,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2794,6 +2839,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -2869,6 +2915,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Load the coordinator.
@@ -2948,6 +2995,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -3022,6 +3070,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -3093,6 +3142,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(serializer)
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3127,17 +3177,11 @@ public class CoordinatorRuntimeTest {
     }
 
     @Test
-    public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+    public void 
testCoordinatorDoNotRetainBufferLargeThanCachedBufferMaxBytes() {
         MockTimer timer = new MockTimer();
-        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
-            @Override
-            public LogConfig config(TopicPartition tp) {
-                return new LogConfig(Map.of(
-                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024) // 1MB
-                ));
-            }
-        };
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false);
         StringSerializer serializer = new StringSerializer();
+        CoordinatorRuntimeMetrics runtimeMetrics = 
mock(CoordinatorRuntimeMetrics.class);
 
         CoordinatorRuntime<MockCoordinatorShard, String> runtime =
             new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
@@ -3148,10 +3192,11 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(mockWriter)
                 .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
-                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(serializer)
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3163,7 +3208,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
-        // Generate a record larger than the maxBatchSize.
+        // Generate a record larger than the cachedBufferMaxBytes.
         List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
 
         // Write #1.
@@ -3177,20 +3222,15 @@ public class CoordinatorRuntimeTest {
 
         // Verify that the next buffer retrieved from the bufferSupplier is 
the initial small one, not the large buffer.
         assertEquals(INITIAL_BUFFER_SIZE, 
ctx.bufferSupplier.get(1).capacity());
+        verify(runtimeMetrics, times(1)).recordBufferCacheDiscarded();
     }
 
     @Test
-    public void 
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+    public void 
testCoordinatorRetainExpandedBufferLessOrEqualToCachedBufferMaxBytes() {
         MockTimer timer = new MockTimer();
-        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
-            @Override
-            public LogConfig config(TopicPartition tp) {
-                return new LogConfig(Map.of(
-                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024 * 1024) // 1GB
-                ));
-            }
-        };
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false);
         StringSerializer serializer = new StringSerializer();
+        int cachedBufferMaxBytes = 1024 * 1024 * 1024; // 1GB
 
         CoordinatorRuntime<MockCoordinatorShard, String> runtime =
             new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
@@ -3205,6 +3245,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(serializer)
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> cachedBufferMaxBytes)
                 .build();
 
         // Schedule the loading.
@@ -3216,11 +3257,8 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
-        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
-        List<String> records = new ArrayList<>();
-        for (int i = 0; i < 1000000; i++) {
-            records.add("record-" + i);
-        }
+        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < cachedBufferMaxBytes
+        List<String> records = List.of("A".repeat(INITIAL_BUFFER_SIZE + 1024));
 
         // Write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3232,30 +3270,19 @@ public class CoordinatorRuntimeTest {
         assertFalse(write1.isCompletedExceptionally());
 
         int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
-        int maxBatchSize = mockWriter.config(TP).maxMessageSize();
-        assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= 
maxBatchSize);
+        assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= 
cachedBufferMaxBytes);
 
         // Verify that the next buffer retrieved from the bufferSupplier is 
the expanded buffer.
         assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
     }
 
     @Test
-    public void 
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+    public void 
testBufferShrinkWhenCachedBufferMaxBytesReducedBelowBatchSize() {
         MockTimer timer = new MockTimer();
-        var mockWriter = new InMemoryPartitionWriter(false) {
-            private LogConfig config = new LogConfig(Map.of(
-                TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 
1024) // 1MB
-            ));
-
-            @Override
-            public LogConfig config(TopicPartition tp) {
-                return config;
-            }
-
-            public void updateConfig(LogConfig newConfig) {
-                this.config = newConfig;
-            }
-        };
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false);
+        Supplier<Integer> maxBufferSizeSupplierMock = mock(Supplier.class);
+        CoordinatorRuntimeMetrics runtimeMetrics = 
mock(CoordinatorRuntimeMetrics.class);
+        
when(maxBufferSizeSupplierMock.get()).thenReturn(CACHED_BUFFER_MAX_BYTES);
         StringSerializer serializer = new StringSerializer();
 
         CoordinatorRuntime<MockCoordinatorShard, String> runtime =
@@ -3267,10 +3294,11 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(mockWriter)
                 .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
-                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(serializer)
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(maxBufferSizeSupplierMock)
                 .build();
 
         // Schedule the loading.
@@ -3282,10 +3310,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
-        List<String> records = new ArrayList<>();
-        for (int i = 0; i < 1000; i++) {
-            records.add("record-" + i);
-        }
+        List<String> records = List.of("A".repeat(INITIAL_BUFFER_SIZE + 1024));
 
         // Write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3297,18 +3322,15 @@ public class CoordinatorRuntimeTest {
         assertFalse(write1.isCompletedExceptionally());
 
         int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
-        int maxBatchSize = mockWriter.config(TP).maxMessageSize();
-        assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= 
maxBatchSize);
+        assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize <= 
CACHED_BUFFER_MAX_BYTES);
 
         ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
-        assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
+        assertTrue(cachedBuffer.capacity() > INITIAL_BUFFER_SIZE && 
cachedBuffer.capacity() < CACHED_BUFFER_MAX_BYTES);
         // ctx.bufferSupplier.get(1); will clear cachedBuffer in 
bufferSupplier. Use release to put it back to bufferSupplier
         ctx.bufferSupplier.release(cachedBuffer);
 
-        // Reduce max message size below initial buffer size.
-        mockWriter.updateConfig(new LogConfig(
-            Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(INITIAL_BUFFER_SIZE - 66))));
-        assertEquals(INITIAL_BUFFER_SIZE - 66, 
mockWriter.config(TP).maxMessageSize());
+        // Reduce max buffer size below batch size.
+        when(maxBufferSizeSupplierMock.get()).thenReturn(batchSize - 66);
 
         // Write #2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3316,8 +3338,9 @@ public class CoordinatorRuntimeTest {
         );
         assertFalse(write2.isCompletedExceptionally());
 
-        // Verify that there is no cached buffer since the cached buffer size 
is greater than new maxMessageSize.
+        // Verify that there is no cached buffer since the cached buffer size 
is greater than new cached buffer max bytes.
         assertEquals(1, ctx.bufferSupplier.get(1).capacity());
+        verify(runtimeMetrics, times(1)).recordBufferCacheDiscarded();
 
         // Write #3.
         CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3325,8 +3348,9 @@ public class CoordinatorRuntimeTest {
         );
         assertFalse(write3.isCompletedExceptionally());
 
-        // Verify that the cached buffer size is equals to new maxMessageSize 
that less than INITIAL_BUFFER_SIZE.
-        assertEquals(mockWriter.config(TP).maxMessageSize(), 
ctx.bufferSupplier.get(1).capacity());
+        // Verify that the cached buffer size is equals to initial buffer size.
+        assertEquals(INITIAL_BUFFER_SIZE, 
ctx.bufferSupplier.get(1).capacity());
+        verify(runtimeMetrics, times(2)).recordBufferCacheDiscarded();
     }
 
     @Test
@@ -3348,6 +3372,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3483,6 +3508,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3535,6 +3561,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3621,6 +3648,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3719,6 +3747,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -3854,6 +3883,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.empty())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -3962,6 +3992,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.empty())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -4151,6 +4182,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.empty())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -4312,6 +4344,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4427,6 +4460,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4476,6 +4510,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4585,6 +4620,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4681,6 +4717,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4768,6 +4805,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(serializer)
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4839,6 +4877,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -4952,6 +4991,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -5063,6 +5103,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -5075,7 +5116,7 @@ public class CoordinatorRuntimeTest {
         // Get the max batch size.
         int maxBatchSize = writer.config(TP).maxMessageSize();
 
-        // Create 2 records with a quarter of the max batch size each. 
+        // Create 2 records with a quarter of the max batch size each.
         List<String> records = Stream.of('1', '2').map(c -> {
             char[] payload = new char[maxBatchSize / 4];
             Arrays.fill(payload, c);
@@ -5099,7 +5140,7 @@ public class CoordinatorRuntimeTest {
 
         // Write #2 with the large record. This record is too large to go into 
the previous batch
         // uncompressed but fits in a new buffer, so we should flush the 
previous batch and allocate
-        // a new one. 
+        // a new one.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
             state -> new CoordinatorResult<>(largeRecord, "response2")
         );
@@ -5149,6 +5190,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -5161,7 +5203,7 @@ public class CoordinatorRuntimeTest {
         // Get the max batch size.
         int maxBatchSize = writer.config(TP).maxMessageSize();
 
-        // Create 2 records with a quarter of the max batch size each. 
+        // Create 2 records with a quarter of the max batch size each.
         List<String> records = Stream.of('1', '2').map(c -> {
             char[] payload = new char[maxBatchSize / 4];
             Arrays.fill(payload, c);
@@ -5235,6 +5277,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
@@ -5247,7 +5290,7 @@ public class CoordinatorRuntimeTest {
         // Get the max batch size.
         int maxBatchSize = writer.config(TP).maxMessageSize();
 
-        // Create 2 records with a quarter of the max batch size each. 
+        // Create 2 records with a quarter of the max batch size each.
         List<String> records = Stream.of('1', '2').map(c -> {
             char[] payload = new char[maxBatchSize / 4];
             Arrays.fill(payload, c);
@@ -5302,7 +5345,7 @@ public class CoordinatorRuntimeTest {
         assertTrue(write2.isDone());
         assertEquals(2L, ctx.coordinator.lastCommittedOffset());
         assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
-    } 
+    }
 
     @Test
     public void testRecordEventPurgatoryTime() throws Exception {
@@ -5326,6 +5369,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -5413,6 +5457,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -5480,6 +5525,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -5561,6 +5607,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(executorService)
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Loads the coordinator. Poll once to execute the load operation and 
once
@@ -5643,6 +5690,7 @@ public class CoordinatorRuntimeTest {
                 .withSerializer(new StringSerializer())
                 .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
                 .build();
 
         // Schedule the loading.
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 365fef6eb97..51d7dbe5416 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -37,6 +37,8 @@ import 
org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.LogContext
 import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
 import org.apache.kafka.config
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.KafkaRaftClient
@@ -99,7 +101,9 @@ object DynamicBrokerConfig {
     SocketServer.ReconfigurableConfigs ++
     DynamicProducerStateManagerConfig ++
     DynamicRemoteLogConfig.ReconfigurableConfigs ++
-    Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG)
+    Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
+    GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
+    ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
 
   private val ClusterLevelListenerConfigs = 
Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, 
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
   private val PerBrokerConfigs = (DynamicSecurityConfigs ++ 
DynamicListenerConfig.ReconfigurableConfigs).diff(
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 6d269c23f67..8504f3705ac 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -31,6 +31,8 @@ import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric, Metrics, 
MetricsReporter}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.DynamicThreadPool
@@ -1105,6 +1107,23 @@ class DynamicBrokerConfigTest {
     updateReporter(classOf[MockMetricsReporter])
     verifyNoMoreInteractions(telemetryPlugin)
   }
+
+  @Test
+  def testCoordinatorCachedBufferMaxBytesUpdates(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, port = 8181)
+    origProps.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"2097152")
+    origProps.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"3145728")
+    val ctx = new DynamicLogConfigContext(origProps)
+    assertEquals(2 * 1024 * 1024, 
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
+    assertEquals(3 * 1024 * 1024, 
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+
+    val props = new Properties()
+    props.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "4194304")
+    props.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "5242880")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(4 * 1024 * 1024, 
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
+    assertEquals(5 * 1024 * 1024, 
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+  }
 }
 
 class TestDynamicThreadPool extends BrokerReconfigurable {
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 519a7d951a3..2f13f0d2dac 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -31,7 +31,8 @@ import 
org.apache.kafka.common.quota.ClientQuotaEntity.{CLIENT_ID, IP, USER}
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.GroupConfig
+import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -459,6 +460,54 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
+  @Test
+  def testDynamicGroupCoordinatorConfigChange(): Unit = {
+    val newCachedBufferMaxBytes = 2 * 1024 * 1024
+    val brokerId: String = this.brokers.head.config.brokerId.toString
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+      val op = new AlterConfigOp(
+        new ConfigEntry(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
newCachedBufferMaxBytes.toString),
+        OpType.SET
+      )
+      admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+    } finally {
+      admin.close()
+    }
+
+    for (b <- this.brokers) {
+      val value = if (b.config.brokerId.toString == brokerId) 
newCachedBufferMaxBytes else 
GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_DEFAULT
+      TestUtils.retry(10000) {
+        assertEquals(value, 
b.config.groupCoordinatorConfig.cachedBufferMaxBytes())
+      }
+    }
+  }
+
+  @Test
+  def testDynamicShareCoordinatorConfigChange(): Unit = {
+    val newCachedBufferMaxBytes = 2 * 1024 * 1024
+    val brokerId: String = this.brokers.head.config.brokerId.toString
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+      val op = new AlterConfigOp(
+        new ConfigEntry(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
newCachedBufferMaxBytes.toString),
+        OpType.SET
+      )
+      admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+    } finally {
+      admin.close()
+    }
+
+    for (b <- this.brokers) {
+      val value = if (b.config.brokerId.toString == brokerId) 
newCachedBufferMaxBytes else 
ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_DEFAULT
+      TestUtils.retry(10000) {
+        assertEquals(value, 
b.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+      }
+    }
+  }
+
   private def createAdminClient(): Admin = {
     val props = new Properties()
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index dd80b50f9c2..4ff67c6692d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1032,6 +1032,7 @@ class KafkaConfigTest {
         /** New group coordinator configs */
         case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
+        case GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, 512 * 1024 - 
1)
 
         /** Consumer groups configs */
         case GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git a/docs/ops.html b/docs/ops.html
index 10eba0aca9f..df0b124c17a 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1937,6 +1937,16 @@ The following set of metrics are available for 
monitoring the group coordinator:
       
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-rate</td>
       <td>The number of batches flushed per second</td>
     </tr>
+    <tr>
+      <td>Batch Buffer Cache Size</td>
+      
<td>kafka.server:type=group-coordinator-metrics,name=batch-buffer-cache-size-bytes</td>
+      <td>The total size in bytes of append buffers currently held in the 
coordinator's cache</td>
+    </tr>
+    <tr>
+      <td>Batch Buffer Cache Discard Count</td>
+      
<td>kafka.server:type=group-coordinator-metrics,name=batch-buffer-cache-discard-count</td>
+      <td>The total number of over-sized append buffers that were discarded 
upon release</td>
+    </tr>
     <tr>
       <td>Group Count, per group type</td>
       
<td>kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic}</td>
@@ -4180,6 +4190,16 @@ customized state stores; for built-in state stores, 
currently we have:
       
<td>kafka.server:type=share-coordinator-metrics,name=last-pruned-offset,topic=([-.\w]+),partition=([0-9]+)</td>
       <td>The offset at which the share-group state topic was last pruned.</td>
     </tr>
+    <tr>
+      <td>batch-buffer-cache-size-bytes</td>
+      
<td>kafka.server:type=share-coordinator-metrics,name=batch-buffer-cache-size-bytes</td>
+      <td>The total size in bytes of append buffers currently held in the 
share coordinator's cache</td>
+    </tr>
+    <tr>
+      <td>batch-buffer-cache-discard-count</td>
+      
<td>kafka.server:type=share-coordinator-metrics,name=batch-buffer-cache-discard-count</td>
+      <td>The total number of over-sized append buffers that were discarded 
upon release</td>
+    </tr>
     </tbody>
   </table>
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4c807382f0f..1c0aea9e53a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,19 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_4_3_0" href="#upgrade_4_3_0">Upgrading to 4.3.0</a></h4>
+
+<h5><a id="upgrade_4_3_0_from" href="#upgrade_4_3_0_from">Upgrading Servers to 
4.3.0 from any version 3.3.x through 4.2.0</a></h5>
+
+<h5><a id="upgrade_430_notable" href="#upgrade_430_notable">Notable changes in 
4.3.0</a></h5>
+<ul>
+    <li>
+        Two new configs have been introduced: 
<code>group.coordinator.cached.buffer.max.bytes</code> and 
<code>share.coordinator.cached.buffer.max.bytes</code>.
+        They allow the respective coordinators to set the maximum buffer size 
retained for reuse.
+        For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/hA5JFg";>KIP-1196</a>.
+    </li>
+</ul>
+
 <h4><a id="upgrade_4_2_0" href="#upgrade_4_2_0">Upgrading to 4.2.0</a></h4>
 
 <h5><a id="upgrade_4_2_0_from" href="#upgrade_4_2_0_from">Upgrading Servers to 
4.2.0 from any version 3.3.x through 4.1.x</a></h5>
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 052f89023b3..a6c070795c5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
 import 
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
@@ -37,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -109,6 +111,12 @@ public class GroupCoordinatorConfig {
     public static final CompressionType 
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
     public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = 
"Compression codec for the offsets topic - compression may be used to achieve 
\"atomic\" commits.";
 
+    public static final String CACHED_BUFFER_MAX_BYTES_CONFIG = 
"group.coordinator.cached.buffer.max.bytes";
+    public static final int CACHED_BUFFER_MAX_BYTES_DEFAULT = 1024 * 1024 + 
Records.LOG_OVERHEAD;
+    public static final String CACHED_BUFFER_MAX_BYTES_DOC = "The maximum 
buffer size that the GroupCoordinator will retain for reuse. " +
+        "Note: Setting this larger than the maximum message size is not 
recommended. In this case, every write buffer will be eligible " +
+        "for recycling, which renders this configuration ineffective as a size 
limit.";
+
     ///
     /// Offset configs
     ///
@@ -300,6 +308,10 @@ public class GroupCoordinatorConfig {
     public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 
30_000;
     public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC = 
"Time elapsed before retrying initialize share group state request. If below 
offsets.commit.timeout.ms, then value of offsets.commit.timeout.ms is used.";
 
+    public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+        CACHED_BUFFER_MAX_BYTES_CONFIG
+    );
+    
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         // Group coordinator configs
         .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, 
@@ -312,6 +324,9 @@ public class GroupCoordinatorConfig {
         .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, 
OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
OFFSETS_TOPIC_PARTITIONS_DOC)
         .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, 
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
         .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) 
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, 
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
+        // The minimum size is set equal to `INITIAL_BUFFER_SIZE` to prevent 
CACHED_BUFFER_MAX_BYTES from being configured too small,
+        // which could otherwise negatively impact performance.
+        .define(CACHED_BUFFER_MAX_BYTES_CONFIG, INT, 
CACHED_BUFFER_MAX_BYTES_DEFAULT, atLeast(512 * 1024), MEDIUM, 
CACHED_BUFFER_MAX_BYTES_DOC)
 
         // Offset configs
         .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, 
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
@@ -413,6 +428,8 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupMaxStandbyReplicas;
     private final int streamsGroupInitialRebalanceDelayMs;
 
+    private final AbstractConfig config;
+
     @SuppressWarnings("this-escape")
     public GroupCoordinatorConfig(AbstractConfig config) {
         this.numThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
@@ -465,6 +482,7 @@ public class GroupCoordinatorConfig {
         this.streamsGroupNumStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
         this.streamsGroupMaxStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
         this.streamsGroupInitialRebalanceDelayMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        this.config = config;
 
         // New group coordinator configs validation.
         require(consumerGroupMaxHeartbeatIntervalMs >= 
consumerGroupMinHeartbeatIntervalMs,
@@ -708,6 +726,15 @@ public class GroupCoordinatorConfig {
         return offsetMetadataMaxSize;
     }
 
+    /**
+     * The maximum buffer size that the coordinator can cache.
+     *
+     * Note: On hot paths, frequent calls to this method may cause performance 
bottlenecks due to synchronization overhead.
+     */
+    public int cachedBufferMaxBytes() {
+        return 
config.getInt(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG);
+    }
+
     /**
      * The classic group maximum size.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9c29183ca38..83f1b0ca5e0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -275,6 +275,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     
.withCompression(Compression.of(config.offsetTopicCompressionType()).build())
                     .withAppendLingerMs(config.appendLingerMs())
                     .withExecutorService(Executors.newSingleThreadExecutor())
+                    
.withCachedBufferMaxBytesSupplier(config::cachedBufferMaxBytes)
                     .build();
 
             return new GroupCoordinatorService(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index be4db2483cb..1e6dcce6a85 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -200,6 +200,7 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 222);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 15 * 60 * 1000);
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 5000);
+        configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 2 * 
1024 * 1024);
 
         GroupCoordinatorConfig config = createConfig(configs);
 
@@ -230,6 +231,7 @@ public class GroupCoordinatorConfigTest {
         assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
         assertEquals(15 * 60 * 1000, 
config.consumerGroupRegexRefreshIntervalMs());
         assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
+        assertEquals(2 * 1024 * 1024, config.cachedBufferMaxBytes());
     }
 
     @Test
@@ -375,6 +377,7 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 5);
         configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 1000);
+        configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
1024 * 1024);
 
         return createConfig(configs);
     }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index 98e0a2a5f25..7b6bbb1dc5c 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -19,10 +19,12 @@ package org.apache.kafka.coordinator.share;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -82,6 +84,16 @@ public class ShareCoordinatorConfig {
     public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 * 
60 * 1000; // 5 minutes
     public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The 
duration in milliseconds that the share coordinator will wait between force 
snapshotting share partitions which are not being updated.";
 
+    public static final String CACHED_BUFFER_MAX_BYTES_CONFIG = 
"share.coordinator.cached.buffer.max.bytes";
+    public static final int CACHED_BUFFER_MAX_BYTES_DEFAULT = 1024 * 1024 + 
Records.LOG_OVERHEAD;
+    public static final String CACHED_BUFFER_MAX_BYTES_DOC = "The maximum 
buffer size that the ShareCoordinator will retain for reuse. " +
+        "Note: Setting this larger than the maximum message size is not 
recommended. In this case, every write buffer will be eligible " +
+        "for recycling, which renders this configuration ineffective as a size 
limit.";
+
+    public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+        CACHED_BUFFER_MAX_BYTES_CONFIG
+    );
+    
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, 
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_NUM_PARTITIONS_DOC)
         .define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -94,7 +106,10 @@ public class ShareCoordinatorConfig {
         .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, 
atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC)
         .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, 
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
         .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC)
-        .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC);
+        .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC)
+        // The minimum size is set equal to `INITIAL_BUFFER_SIZE` to prevent 
CACHED_BUFFER_MAX_BYTES from being configured too small,
+        // which could otherwise negatively impact performance.
+        .define(CACHED_BUFFER_MAX_BYTES_CONFIG, INT, 
CACHED_BUFFER_MAX_BYTES_DEFAULT, atLeast(512 * 1024), MEDIUM, 
CACHED_BUFFER_MAX_BYTES_DOC);
 
     private final int stateTopicNumPartitions;
     private final short stateTopicReplicationFactor;
@@ -109,6 +124,8 @@ public class ShareCoordinatorConfig {
     private final int pruneIntervalMs;
     private final int coldPartitionSnapshotIntervalMs;
 
+    private final AbstractConfig config;
+
     public ShareCoordinatorConfig(AbstractConfig config) {
         stateTopicNumPartitions = 
config.getInt(STATE_TOPIC_NUM_PARTITIONS_CONFIG);
         stateTopicReplicationFactor = 
config.getShort(STATE_TOPIC_REPLICATION_FACTOR_CONFIG);
@@ -124,6 +141,7 @@ public class ShareCoordinatorConfig {
         appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
         pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
         coldPartitionSnapshotIntervalMs = 
config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
+        this.config = config;
         validate();
     }
 
@@ -182,6 +200,15 @@ public class ShareCoordinatorConfig {
     public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
         return coldPartitionSnapshotIntervalMs;
     }
+    
+    /**
+     * The maximum buffer size that the share coordinator can cache.
+     *
+     * Note: On hot paths, frequent calls to this method may cause performance 
bottlenecks due to synchronization overhead.
+     */
+    public int shareCoordinatorCachedBufferMaxBytes() {
+        return config.getInt(CACHED_BUFFER_MAX_BYTES_CONFIG);
+    }
 
     private void validate() {
         Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && 
snapshotUpdateRecordsPerSnapshot <= 500,
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 19d198b8de4..8a90cf9742f 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -208,6 +208,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                     
.withCompression(Compression.of(config.shareCoordinatorStateTopicCompressionType()).build())
                     
.withAppendLingerMs(config.shareCoordinatorAppendLingerMs())
                     .withExecutorService(Executors.newSingleThreadExecutor())
+                    
.withCachedBufferMaxBytesSupplier(config::shareCoordinatorCachedBufferMaxBytes)
                     .build();
 
             return new ShareCoordinatorService(
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index 853bc119432..e3f885c8fb6 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -51,6 +51,7 @@ public class ShareCoordinatorTestConfig {
         
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, 
String.valueOf(CompressionType.NONE.id));
         
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, 
"30000");  // 30 seconds
         
configs.put(ShareCoordinatorConfig.COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, 
"10000");    // 10 seconds
+        configs.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"1048576");  // 1024 * 1024
         return configs;
     }
 


Reply via email to