This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b58aae288fb KAFKA-19826: Implement coordinator adaptive batch linger 
time (#20780)
b58aae288fb is described below

commit b58aae288fb794b01fd2fc6a29748eba1d277bd6
Author: Sean Quah <[email protected]>
AuthorDate: Fri Nov 14 20:16:28 2025 +0000

    KAFKA-19826: Implement coordinator adaptive batch linger time (#20780)
    
    Add support for an adaptive batch linger time in the group and share
    coordinators. When an adaptive batch linger time is enabled, we no
    longer create a timer to flush the current batch. Instead, we append a
    flush operation at the end of the event queue so that any currently
    queued operations are naturally collected into the batch.
    
    To avoid double flushing from hitting the maximum batch size or
    transactional writes, we number batches with an epoch to check whether
    the batch has already been flushed.
    
    The group.coordinator.append.linger.ms and
    share.coordinator.append.linger.ms configs are extended to allow -1, to
    specify an adaptive append linger time. The default for these configs is
    also updated to -1.
    
    Reviewers: PoAn Yang <[email protected]>, TaiJuWu <[email protected]>, 
David Jacot <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +
 .../common/runtime/CoordinatorRuntime.java         |  85 +++-
 .../common/runtime/CoordinatorRuntimeTest.java     | 490 ++++++++++++++++++++-
 .../common/runtime/DirectEventProcessor.java       |  41 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   5 +
 .../coordinator/group/GroupCoordinatorConfig.java  |  18 +-
 .../group/GroupCoordinatorConfigTest.java          |  15 +-
 .../coordinator/share/ShareCoordinatorConfig.java  |  20 +-
 .../share/ShareCoordinatorConfigTest.java          |  49 +++
 9 files changed, 663 insertions(+), 62 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 85b16fd65d4..1bf8431f11a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -340,6 +340,8 @@
               files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
 
     <!-- coordinator-common -->
+    <suppress checks="JavaNCSS"
+              files="CoordinatorRuntimeTest.java"/>
     <suppress checks="NPathComplexity"
               files="CoordinatorRuntime.java"/>
 
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 7bd8fd99233..54544950aa1 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -117,7 +117,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         private CoordinatorMetrics coordinatorMetrics;
         private Serializer<U> serializer;
         private Compression compression;
-        private int appendLingerMs;
+        private OptionalInt appendLingerMs;
         private ExecutorService executorService;
 
         public Builder<S, U> withLogPrefix(String logPrefix) {
@@ -185,7 +185,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             return this;
         }
 
-        public Builder<S, U> withAppendLingerMs(int appendLingerMs) {
+        public Builder<S, U> withAppendLingerMs(OptionalInt appendLingerMs) {
             this.appendLingerMs = appendLingerMs;
             return this;
         }
@@ -195,6 +195,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             return this;
         }
 
+        @SuppressWarnings("checkstyle:CyclomaticComplexity")
         public CoordinatorRuntime<S, U> build() {
             if (logPrefix == null)
                 logPrefix = "";
@@ -220,8 +221,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 throw new IllegalArgumentException("Serializer must be set.");
             if (compression == null)
                 compression = Compression.NONE;
-            if (appendLingerMs < 0)
-                throw new IllegalArgumentException("AppendLinger must be >= 
0");
+            if (appendLingerMs == null)
+                appendLingerMs = OptionalInt.empty();
+            if (appendLingerMs.isPresent() && appendLingerMs.getAsInt() < 0)
+                throw new IllegalArgumentException("AppendLinger must be empty 
or >= 0");
             if (executorService == null)
                 throw new IllegalArgumentException("ExecutorService must be 
set.");
 
@@ -604,6 +607,12 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         CoordinatorBatch currentBatch;
 
+        /**
+         * The batch epoch. Incremented every time a new batch is started.
+         * Only valid for the lifetime of the CoordinatorContext. The first 
batch has an epoch of 1.
+         */
+        int batchEpoch;
+
         /**
          * Constructor.
          *
@@ -774,6 +783,24 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             currentBatch = null;
         }
 
+        /**
+         * Adds a flush event to the end of the event queue, after any 
existing writes in the queue.
+         *
+         * @param expectedBatchEpoch The epoch of the batch to flush.
+         */
+        private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
+            enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
+                withActiveContextOrThrow(tp, context -> {
+                    // The batch could have already been flushed because it 
reached the maximum
+                    // batch size or a transactional write came in. When this 
happens, we want
+                    // to avoid flushing the next batch early.
+                    if (context.currentBatch != null && context.batchEpoch == 
expectedBatchEpoch) {
+                        context.flushCurrentBatch();
+                    }
+                });
+            }));
+        }
+
         /**
          * Flushes the current (or pending) batch to the log. When the batch 
is written
          * locally, a new snapshot is created in the snapshot registry and the 
events
@@ -839,7 +866,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         private void maybeFlushCurrentBatch(long currentTimeMs) {
             if (currentBatch != null) {
-                if (currentBatch.builder.isTransactional() || (currentTimeMs - 
currentBatch.appendTimeMs) >= appendLingerMs || 
!currentBatch.builder.hasRoomFor(0)) {
+                if (currentBatch.builder.isTransactional() ||
+                    // When adaptive linger time is enabled, we avoid flushing 
here.
+                    // Instead, we rely on the flush event enqueued at the 
back of the event queue.
+                    (appendLingerMs.isPresent() && (currentTimeMs - 
currentBatch.appendTimeMs) >= appendLingerMs.getAsInt()) ||
+                    !currentBatch.builder.hasRoomFor(0)) {
                     flushCurrentBatch();
                 }
             }
@@ -888,20 +919,31 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                     maxBatchSize
                 );
 
+                batchEpoch++;
+
                 Optional<TimerTask> lingerTimeoutTask = Optional.empty();
-                if (appendLingerMs > 0) {
-                    lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
-                        @Override
-                        public void run() {
-                            // An event to flush the batch is pushed to the 
front of the queue
-                            // to ensure that the linger time is respected.
-                            enqueueFirst(new 
CoordinatorInternalEvent("FlushBatch", tp, () -> {
-                                if (this.isCancelled()) return;
-                                withActiveContextOrThrow(tp, 
CoordinatorContext::flushCurrentBatch);
-                            }));
-                        }
-                    });
-                    CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
+                if (appendLingerMs.isPresent()) {
+                    if (appendLingerMs.getAsInt() > 0) {
+                        lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs.getAsInt()) {
+                            @Override
+                            public void run() {
+                                // An event to flush the batch is pushed to 
the front of the queue
+                                // to ensure that the linger time is respected.
+                                enqueueFirst(new 
CoordinatorInternalEvent("FlushBatch", tp, () -> {
+                                    if (this.isCancelled()) return;
+                                    withActiveContextOrThrow(tp, 
CoordinatorContext::flushCurrentBatch);
+                                }));
+                            }
+                        });
+                        
CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
+                    }
+                } else {
+                    // Always queue a flush immediately at the end of the 
queue, unless the batch is
+                    // transactional. Transactional batches are flushed 
immediately at the end of
+                    // the write, so a flush event is never needed.
+                    if (!builder.isTransactional()) {
+                        enqueueAdaptiveFlush(batchEpoch);
+                    }
                 }
 
                 currentBatch = new CoordinatorBatch(
@@ -1997,9 +2039,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
 
     /**
      * The duration in milliseconds that the coordinator will wait for writes 
to
-     * accumulate before flushing them to disk.
+     * accumulate before flushing them to disk. {@code OptionalInt.empty()} 
indicates
+     * an adaptive linger time based on the workload.
      */
-    private final int appendLingerMs;
+    private final OptionalInt appendLingerMs;
 
     /**
      * The executor service used by the coordinator runtime to schedule
@@ -2051,7 +2094,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         CoordinatorMetrics coordinatorMetrics,
         Serializer<U> serializer,
         Compression compression,
-        int appendLingerMs,
+        OptionalInt appendLingerMs,
         ExecutorService executorService
     ) {
         this.logPrefix = logPrefix;
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 80c3db57252..5574771dfc1 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -634,7 +634,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(metrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -2666,6 +2666,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -2740,7 +2741,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(metrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -2817,6 +2818,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3214,7 +3216,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3349,7 +3351,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3401,7 +3403,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3487,7 +3489,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3585,7 +3587,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3700,6 +3702,453 @@ public class CoordinatorRuntimeTest {
         assertNull(complete1.get(5, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testAdaptiveAppendLingerTime() {
+        MockTimer timer = new MockTimer();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.empty())
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Write #1.
+        runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1")
+        );
+
+        // Write #2.
+        runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record3"), "response2")
+        );
+
+        // Execute write #1.
+        processor.poll();
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // A flush event is queued after write #2.
+        assertEquals(2, processor.size());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1"),
+            new MockCoordinatorShard.RecordAndMetadata(1, "record2")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
+
+        // Execute write #2.
+        processor.poll();
+        assertEquals(1, processor.size());
+
+        // The batch has not been flushed.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1"),
+            new MockCoordinatorShard.RecordAndMetadata(1, "record2"),
+            new MockCoordinatorShard.RecordAndMetadata(2, "record3")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
+
+        // Flush the batch.
+        processor.poll();
+
+        // The batch is flushed.
+        assertNull(ctx.currentBatch);
+
+        // Verify the state.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1"),
+            new MockCoordinatorShard.RecordAndMetadata(1, "record2"),
+            new MockCoordinatorShard.RecordAndMetadata(2, "record3")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), "record1", 
"record2", "record3")
+        ), writer.entries(TP));
+    }
+
+    /**
+     * Tests a flush triggered by the max batch size with an adaptive append 
linger time.
+     *
+     * The flush for the first batch must not flush the second batch.
+     */
+    @Test
+    @SuppressWarnings("checkstyle:MethodLength")
+    public void testAdaptiveAppendLingerWithMaxBatchSizeFlush() {
+        MockTimer timer = new MockTimer();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.empty())
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4', '5').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1")
+        );
+
+        // Write #2.
+        runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2")
+        );
+
+        // Write #3.
+        runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response3")
+        );
+
+        // Write #4.
+        runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response4")
+        );
+
+        // Execute write #1, write #2 and write #3.
+        processor.poll();
+        processor.poll();
+        processor.poll();
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // A flush event is queued after write #4.
+        assertEquals(2, processor.size());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
+
+        // Write #5.
+        runtime.scheduleWriteOperation("write#5", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(4, 5), 
"response5")
+        );
+
+        // Execute write #4. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        processor.poll();
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Another flush event is queued after write #5.
+        assertEquals(3, processor.size());
+
+        // Verify the state.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), records.subList(0, 
3))
+        ), writer.entries(TP));
+
+        // Execute the first flush.
+        processor.poll();
+        assertEquals(2, processor.size());
+
+        // The flush does not belong to the current batch and is ignored.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), records.subList(0, 
3))
+        ), writer.entries(TP));
+
+        // Execute write #5.
+        processor.poll();
+        assertEquals(1, processor.size());
+
+        // Verify the state.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)),
+            new MockCoordinatorShard.RecordAndMetadata(4, records.get(4))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), records.subList(0, 
3))
+        ), writer.entries(TP));
+
+        // Execute the second flush.
+        processor.poll();
+        assertEquals(0, processor.size());
+
+        // The batch is flushed.
+        assertNull(ctx.currentBatch);
+
+        // Verify the state.
+        assertEquals(5L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 3L, 5L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)),
+            new MockCoordinatorShard.RecordAndMetadata(4, records.get(4))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), records.subList(0, 
3)),
+            TestUtil.records(timer.time().milliseconds(), records.subList(3, 
5))
+        ), writer.entries(TP));
+    }
+
+    /**
+     * Tests a transactional write with an adaptive append linger time.
+     *
+     * The transactional write must not enqueue a flush, since it flushes 
immediately.
+     * The flush for the batch before the transactional write must not flush 
the batch after the
+     * transactional write.
+     */
+    @Test
+    public void testAdaptiveAppendLingerWithTransactionalWrite() {
+        MockTimer timer = new MockTimer();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.empty())
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Write #1.
+        runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
+        );
+
+        // Transactional write #2. This will flush the batch.
+        runtime.scheduleTransactionalWriteOperation(
+            "txn-write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 50,
+            Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record2"), "response2"),
+            TXN_OFFSET_COMMIT_LATEST_VERSION
+        );
+
+        // Write #3.
+        runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record3"), "response3")
+        );
+
+        assertEquals(3, processor.size());
+
+        // Execute write #1.
+        processor.poll();
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // A flush event is queued after write #3.
+        assertEquals(3, processor.size());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
+
+        // Execute transactional write #2.
+        processor.poll();
+
+        // The batch is flushed.
+        assertNull(ctx.currentBatch);
+
+        // No flush event is queued.
+        assertEquals(2, processor.size());
+
+        // Verify the state. The current batch and the transactional records 
are
+        // written to the log.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), "record1"),
+            TestUtil.transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record2")
+        ), writer.entries(TP));
+
+        // Execute write #3.
+        processor.poll();
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // A flush event is queued after the first flush.
+        assertEquals(2, processor.size());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1"),
+            new MockCoordinatorShard.RecordAndMetadata(2, "record3")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), "record1"),
+            TestUtil.transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record2")
+        ), writer.entries(TP));
+
+        // Execute the first flush.
+        processor.poll();
+        assertEquals(1, processor.size());
+
+        // The flush does not belong to the current batch and is ignored.
+        assertNotNull(ctx.currentBatch);
+
+        // Execute the second flush.
+        processor.poll();
+        assertEquals(0, processor.size());
+
+        // The batch is flushed.
+        assertNull(ctx.currentBatch);
+
+        // Verify the state.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L, 1L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, "record1"),
+            new MockCoordinatorShard.RecordAndMetadata(2, "record3")
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            TestUtil.records(timer.time().milliseconds(), "record1"),
+            TestUtil.transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record2"),
+            TestUtil.records(timer.time().milliseconds(), "record3")
+        ), writer.entries(TP));
+    }
+
     @Test
     public void testStateMachineIsReloadedWhenOutOfSync() {
         MockTimer timer = new MockTimer();
@@ -3728,7 +4177,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3843,7 +4292,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -3892,7 +4341,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4001,7 +4450,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4097,7 +4546,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4184,7 +4633,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(serializer)
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4253,7 +4702,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4366,7 +4815,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4477,7 +4926,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withCompression(compression)
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4563,7 +5012,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withCompression(compression)
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4649,7 +5098,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withCompression(compression)
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4740,6 +5189,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4826,6 +5276,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
@@ -4971,6 +5422,7 @@ public class CoordinatorRuntimeTest {
                 .withCoordinatorRuntimeMetrics(runtimeMetrics)
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.of(0))
                 .withExecutorService(executorService)
                 .build();
 
@@ -5052,7 +5504,7 @@ public class CoordinatorRuntimeTest {
                 
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
                 .withSerializer(new StringSerializer())
-                .withAppendLingerMs(10)
+                .withAppendLingerMs(OptionalInt.of(10))
                 .withExecutorService(mock(ExecutorService.class))
                 .build();
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
index 60b74c3a12f..19c79e26714 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.concurrent.RejectedExecutionException;
 
 /**
@@ -23,24 +25,43 @@ import java.util.concurrent.RejectedExecutionException;
  * useful in unit tests where execution in threads is not required.
  */
 public class DirectEventProcessor implements CoordinatorEventProcessor {
+    private final Deque<CoordinatorEvent> queue;
+    private boolean inEvent;
+
+    public DirectEventProcessor() {
+        this.queue = new LinkedList<>();
+        this.inEvent = false;
+    }
+
     @Override
     public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
-        try {
-            event.run();
-        } catch (Throwable ex) {
-            event.complete(ex);
-        }
+        queue.addLast(event);
+        processQueue();
     }
 
     @Override
     public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
-        try {
-            event.run();
-        } catch (Throwable ex) {
-            event.complete(ex);
-        }
+        queue.addFirst(event);
+        processQueue();
     }
 
     @Override
     public void close() {}
+
+    private void processQueue() {
+        if (inEvent) {
+            return;
+        }
+
+        inEvent = true;
+        while (!queue.isEmpty()) {
+            CoordinatorEvent event = queue.removeFirst();
+            try {
+                event.run();
+            } catch (Throwable ex) {
+                event.complete(ex);
+            }
+        }
+        inEvent = false;
+    }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index ab7f45ae689..c30cc514d44 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -34,6 +34,7 @@ import 
org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
@@ -1038,6 +1039,7 @@ class KafkaConfigTest {
 
         /** New group coordinator configs */
         case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
 
         /** Consumer groups configs */
         case GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
@@ -1077,6 +1079,9 @@ class KafkaConfigTest {
         case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
+        /** Share coordinator configs */
+        case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
+
         case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", 
"-1")
       }
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 9405a32f967..052f89023b3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -72,8 +73,9 @@ public class GroupCoordinatorConfig {
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The 
duration in milliseconds that the coordinator will " +
         "wait for writes to accumulate before flushing them to disk. 
Increasing this value improves write efficiency and batch size, " +
         "but also increases the response latency for requests, as the 
coordinator must wait for batches to be flushed to " +
-        "disk before completing request processing. Transactional writes are 
not accumulated.";
-    public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 5;
+        "disk before completing request processing. Transactional writes are 
not accumulated. " +
+        "Set to -1 for an adaptive linger time that minimizes latency based on 
the workload.";
+    public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = -1;
 
     public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
     public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
@@ -303,7 +305,7 @@ public class GroupCoordinatorConfig {
         .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, 
             ConfigDef.ValidList.in(false, 
Group.GroupType.documentValidValues()), MEDIUM, 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
         .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, 
GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, 
GROUP_COORDINATOR_NUM_THREADS_DOC)
-        .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, 
GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, 
GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
+        .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, 
GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM, 
GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
         .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, 
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, 
OFFSET_COMMIT_TIMEOUT_MS_DOC)
         .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, 
OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
OFFSETS_LOAD_BUFFER_SIZE_DOC)
         .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
@@ -653,10 +655,14 @@ public class GroupCoordinatorConfig {
 
     /**
      * The duration in milliseconds that the coordinator will wait for writes 
to
-     * accumulate before flushing them to disk.
+     * accumulate before flushing them to disk. {@code OptionalInt.empty()} 
indicates
+     * an adaptive linger time based on the workload.
      */
-    public int appendLingerMs() {
-        return appendLingerMs;
+    public OptionalInt appendLingerMs() {
+        if (appendLingerMs == -1) {
+            return OptionalInt.empty();
+        }
+        return OptionalInt.of(appendLingerMs);
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index c447aec5374..be4db2483cb 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -37,6 +37,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalInt;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -219,7 +220,7 @@ public class GroupCoordinatorConfigTest {
         assertEquals(Duration.ofMinutes(24 * 60 * 60 * 1000L).toMillis(), 
config.offsetsRetentionMs());
         assertEquals(5000, config.offsetCommitTimeoutMs());
         assertEquals(CompressionType.GZIP, 
config.offsetTopicCompressionType());
-        assertEquals(10, config.appendLingerMs());
+        assertEquals(OptionalInt.of(10), config.appendLingerMs());
         assertEquals(555, config.offsetsLoadBufferSize());
         assertEquals(111, config.offsetsTopicPartitions());
         assertEquals(11, config.offsetsTopicReplicationFactor());
@@ -332,6 +333,18 @@ public class GroupCoordinatorConfigTest {
             assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
     }
 
+    @Test
+    public void testAppendLingerMs() {
+        GroupCoordinatorConfig config = 
createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
 -1));
+        assertEquals(OptionalInt.empty(), config.appendLingerMs());
+
+        config = 
createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
 0));
+        assertEquals(OptionalInt.of(0), config.appendLingerMs());
+
+        config = 
createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
 5));
+        assertEquals(OptionalInt.of(5), config.appendLingerMs());
+    }
+
     public static GroupCoordinatorConfig createGroupCoordinatorConfig(
         int offsetMetadataMaxSize,
         long offsetsRetentionCheckIntervalMs,
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index b92947af9cd..98e0a2a5f25 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Optional;
+import java.util.OptionalInt;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -69,8 +70,9 @@ public class ShareCoordinatorConfig {
     public static final String STATE_TOPIC_COMPRESSION_CODEC_DOC = 
"Compression codec for the share-group state topic.";
 
     public static final String APPEND_LINGER_MS_CONFIG = 
"share.coordinator.append.linger.ms";
-    public static final int APPEND_LINGER_MS_DEFAULT = 5;
-    public static final String APPEND_LINGER_MS_DOC = "The duration in 
milliseconds that the share coordinator will wait for writes to accumulate 
before flushing them to disk.";
+    public static final int APPEND_LINGER_MS_DEFAULT = -1;
+    public static final String APPEND_LINGER_MS_DOC = "The duration in 
milliseconds that the share coordinator will wait for writes to accumulate 
before flushing them to disk. " +
+        "Set to -1 for an adaptive linger time that minimizes latency based on 
the workload.";
 
     public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = 
"share.coordinator.state.topic.prune.interval.ms";
     public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 
1000; // 5 minutes
@@ -89,7 +91,7 @@ public class ShareCoordinatorConfig {
         .define(SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, INT, 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DEFAULT, atLeast(0), MEDIUM, 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DOC)
         .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, 
atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
         .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) 
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, 
STATE_TOPIC_COMPRESSION_CODEC_DOC)
-        .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, 
atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
+        .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, 
atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC)
         .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, 
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
         .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC)
         .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC);
@@ -157,8 +159,16 @@ public class ShareCoordinatorConfig {
         return loadBufferSize;
     }
 
-    public int shareCoordinatorAppendLingerMs() {
-        return appendLingerMs;
+    /**
+     * The duration in milliseconds that the coordinator will wait for writes 
to
+     * accumulate before flushing them to disk. {@code OptionalInt.empty()} 
indicates
+     * an adaptive linger time based on the workload.
+     */
+    public OptionalInt shareCoordinatorAppendLingerMs() {
+        if (appendLingerMs == -1) {
+            return OptionalInt.empty();
+        }
+        return OptionalInt.of(appendLingerMs);
     }
 
     public CompressionType shareCoordinatorStateTopicCompressionType() {
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
new file mode 100644
index 00000000000..86ae8c1101a
--- /dev/null
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.config.AbstractConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.OptionalInt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShareCoordinatorConfigTest {
+
+    @Test
+    public void testAppendLingerMs() {
+        ShareCoordinatorConfig config = 
createConfig(Map.of(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, -1));
+        assertEquals(OptionalInt.empty(), 
config.shareCoordinatorAppendLingerMs());
+
+        config = 
createConfig(Map.of(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, 0));
+        assertEquals(OptionalInt.of(0), 
config.shareCoordinatorAppendLingerMs());
+
+        config = 
createConfig(Map.of(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, 5));
+        assertEquals(OptionalInt.of(5), 
config.shareCoordinatorAppendLingerMs());
+    }
+
+    public static ShareCoordinatorConfig createConfig(Map<String, Object> 
configs) {
+        return new ShareCoordinatorConfig(new AbstractConfig(
+            ShareCoordinatorConfig.CONFIG_DEF,
+            configs,
+            false
+        ));
+    }
+}

Reply via email to