This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 638844f833b KAFKA-16770; [2/2] Coalesce records into bigger batches 
(#16215)
638844f833b is described below

commit 638844f833b165d6f9ca52c173858d26b7254fac
Author: David Jacot <[email protected]>
AuthorDate: Wed Jun 12 08:29:50 2024 +0200

    KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)
    
    This patch is the continuation of 
https://github.com/apache/kafka/pull/15964. It introduces the records 
coalescing to the CoordinatorRuntime. It also introduces a new configuration 
`group.coordinator.append.linger.ms` which allows administrators to chose the 
linger time or disable it with zero. The new configuration defaults to 10ms.
    
    Reviewers: Jeff Kim <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../kafka/common/record/MemoryRecordsBuilder.java  |  12 +
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +
 .../coordinator/group/GroupCoordinatorConfig.java  |  12 +
 .../coordinator/group/GroupCoordinatorService.java |   2 +-
 .../group/runtime/CoordinatorRuntime.java          | 672 ++++++++++++++-----
 .../group/GroupCoordinatorConfigTest.java          |   3 +
 .../group/GroupCoordinatorServiceTest.java         |   1 +
 .../group/runtime/CoordinatorRuntimeTest.java      | 736 ++++++++++++++++++++-
 .../apache/kafka/server/util/timer/TimerTask.java  |   4 +
 10 files changed, 1263 insertions(+), 182 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 70f279a6c29..b37b1f1ca68 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable 
{
         return this.writeLimit >= estimatedBytesWritten() + recordSize;
     }
 
+    /**
+     * Check if we have room for a given number of bytes.
+     */
+    public boolean hasRoomFor(int estimatedRecordsSize) {
+        if (isFull()) return false;
+        return this.writeLimit >= estimatedBytesWritten() + 
estimatedRecordsSize;
+    }
+
+    public int maxAllowedBytes() {
+        return this.writeLimit - this.batchHeaderSizeInBytes;
+    }
+
     public boolean isClosed() {
         return builtRecords != null;
     }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 31db58c0778..7e225440abf 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -570,6 +570,7 @@ class BrokerServer(
       val serde = new CoordinatorRecordSerde
       val groupCoordinatorConfig = new GroupCoordinatorConfig(
         config.groupCoordinatorNumThreads,
+        config.groupCoordinatorAppendLingerMs,
         config.consumerGroupSessionTimeoutMs,
         config.consumerGroupHeartbeatIntervalMs,
         config.consumerGroupMaxSize,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ad8635f7ce7..db96bcb6762 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -278,6 +278,7 @@ object KafkaConfig {
       
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
         ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
       .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
+      
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
       // Internal configuration used by integration and system tests.
       
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, 
MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
 
@@ -965,6 +966,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val isNewGroupCoordinatorEnabled = 
getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
     groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
   val groupCoordinatorNumThreads = 
getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
+  val groupCoordinatorAppendLingerMs = 
getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)
 
   /** Consumer group configs */
   val consumerGroupSessionTimeoutMs = 
getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 84219aa46ff..cc86b6cf818 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -57,6 +57,10 @@ public class GroupCoordinatorConfig {
             
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(","))
 + ". " +
             "The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
early access and therefore must not be used in production.";
     public static final List<String> 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+    public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = 
"group.coordinator.append.linger.ms";
+    public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The 
duration in milliseconds that the coordinator will " +
+        "wait for writes to accumulate before flushing them to disk. 
Transactional writes are not accumulated.";
+    public final static int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 10;
 
     public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
     public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
@@ -164,6 +168,12 @@ public class GroupCoordinatorConfig {
      */
     public final int numThreads;
 
+    /**
+     * The duration in milliseconds that the coordinator will wait for writes 
to
+     * accumulate before flushing them to disk.
+     */
+    public final int appendLingerMs;
+
     /**
      * The consumer group session timeout in milliseconds.
      */
@@ -259,6 +269,7 @@ public class GroupCoordinatorConfig {
 
     public GroupCoordinatorConfig(
         int numThreads,
+        int appendLingerMs,
         int consumerGroupSessionTimeoutMs,
         int consumerGroupHeartbeatIntervalMs,
         int consumerGroupMaxSize,
@@ -277,6 +288,7 @@ public class GroupCoordinatorConfig {
         CompressionType compressionType
     ) {
         this.numThreads = numThreads;
+        this.appendLingerMs = appendLingerMs;
         this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
         this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
         this.consumerGroupMaxSize = consumerGroupMaxSize;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 5e4e899faa6..f92594f09d0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -182,12 +182,12 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     .withPartitionWriter(writer)
                     .withLoader(loader)
                     .withCoordinatorShardBuilderSupplier(supplier)
-                    .withTime(time)
                     
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
                     .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
                     .withCoordinatorMetrics(groupCoordinatorMetrics)
                     .withSerializer(new CoordinatorRecordSerde())
                     
.withCompression(Compression.of(config.compressionType).build())
+                    .withAppendLingerMs(config.appendLingerMs)
                     .build();
 
             return new GroupCoordinatorService(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 4207c94770b..57927867c89 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -24,11 +24,13 @@ import 
org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.BufferSupplier;
@@ -50,10 +52,12 @@ import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
@@ -66,8 +70,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.record.Record.EMPTY_HEADERS;
-
 /**
  * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
  * or the transaction coordinator.
@@ -115,6 +117,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         private CoordinatorMetrics coordinatorMetrics;
         private Serializer<U> serializer;
         private Compression compression;
+        private int appendLingerMs;
 
         public Builder<S, U> withLogPrefix(String logPrefix) {
             this.logPrefix = logPrefix;
@@ -181,6 +184,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             return this;
         }
 
+        public Builder<S, U> withAppendLingerMs(int appendLingerMs) {
+            this.appendLingerMs = appendLingerMs;
+            return this;
+        }
+
         public CoordinatorRuntime<S, U> build() {
             if (logPrefix == null)
                 logPrefix = "";
@@ -206,6 +214,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 throw new IllegalArgumentException("Serializer must be set.");
             if (compression == null)
                 compression = Compression.NONE;
+            if (appendLingerMs < 0)
+                throw new IllegalArgumentException("AppendLinger must be >= 
0");
 
             return new CoordinatorRuntime<>(
                 logPrefix,
@@ -220,7 +230,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 runtimeMetrics,
                 coordinatorMetrics,
                 serializer,
-                compression
+                compression,
+                appendLingerMs
             );
         }
     }
@@ -275,7 +286,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         FAILED {
             @Override
             boolean canTransitionFrom(CoordinatorState state) {
-                return state == LOADING;
+                return state == LOADING || state == ACTIVE;
             }
         };
 
@@ -434,6 +445,81 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         }
     }
 
+    /**
+     * A simple container class to hold all the attributes
+     * related to a pending batch.
+     */
+    private static class CoordinatorBatch {
+        /**
+         * The base (or first) offset of the batch. If the batch fails
+         * for any reason, the state machines is rolled back to it.
+         */
+        final long baseOffset;
+
+        /**
+         * The time at which the batch was created.
+         */
+        final long appendTimeMs;
+
+        /**
+         * The max batch size.
+         */
+        final int maxBatchSize;
+
+        /**
+         * The verification guard associated to the batch if it is
+         * transactional.
+         */
+        final VerificationGuard verificationGuard;
+
+        /**
+         * The byte buffer backing the records builder.
+         */
+        final ByteBuffer buffer;
+
+        /**
+         * The records builder.
+         */
+        final MemoryRecordsBuilder builder;
+
+        /**
+         * The timer used to enfore the append linger time if
+         * it is non-zero.
+         */
+        final Optional<TimerTask> lingerTimeoutTask;
+
+        /**
+         * The list of deferred events associated with the batch.
+         */
+        final List<DeferredEvent> deferredEvents;
+
+        /**
+         * The next offset. This is updated when records
+         * are added to the batch.
+         */
+        long nextOffset;
+
+        CoordinatorBatch(
+            long baseOffset,
+            long appendTimeMs,
+            int maxBatchSize,
+            VerificationGuard verificationGuard,
+            ByteBuffer buffer,
+            MemoryRecordsBuilder builder,
+            Optional<TimerTask> lingerTimeoutTask
+        ) {
+            this.baseOffset = baseOffset;
+            this.nextOffset = baseOffset;
+            this.appendTimeMs = appendTimeMs;
+            this.maxBatchSize = maxBatchSize;
+            this.verificationGuard = verificationGuard;
+            this.buffer = buffer;
+            this.builder = builder;
+            this.lingerTimeoutTask = lingerTimeoutTask;
+            this.deferredEvents = new ArrayList<>();
+        }
+    }
+
     /**
      * CoordinatorContext holds all the metadata around a coordinator state 
machine.
      */
@@ -493,6 +579,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         BufferSupplier bufferSupplier;
 
+        /**
+         * The current (or pending) batch.
+         */
+        CoordinatorBatch currentBatch;
+
         /**
          * Constructor.
          *
@@ -547,6 +638,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                             .build(),
                         tp
                     );
+                    load();
                     break;
 
                 case ACTIVE:
@@ -573,6 +665,46 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             runtimeMetrics.recordPartitionStateChange(oldState, state);
         }
 
+        /**
+         * Loads the coordinator.
+         */
+        private void load() {
+            if (state != CoordinatorState.LOADING) {
+                throw new IllegalStateException("Coordinator must be in 
loading state");
+            }
+
+            loader.load(tp, coordinator).whenComplete((summary, exception) -> {
+                scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" 
+ epoch + ")", tp, () -> {
+                    CoordinatorContext context = coordinators.get(tp);
+                    if (context != null)  {
+                        if (context.state != CoordinatorState.LOADING) {
+                            log.info("Ignored load completion from {} because 
context is in {} state.",
+                                context.tp, context.state);
+                            return;
+                        }
+                        try {
+                            if (exception != null) throw exception;
+                            context.transitionTo(CoordinatorState.ACTIVE);
+                            if (summary != null) {
+                                
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), 
summary.endTimeMs());
+                                log.info("Finished loading of metadata from {} 
with epoch {} in {}ms where {}ms " +
+                                        "was spent in the scheduler. Loaded {} 
records which total to {} bytes.",
+                                    tp, epoch, summary.endTimeMs() - 
summary.startTimeMs(),
+                                    summary.schedulerQueueTimeMs(), 
summary.numRecords(), summary.numBytes());
+                            }
+                        } catch (Throwable ex) {
+                            log.error("Failed to load metadata from {} with 
epoch {} due to {}.",
+                                tp, epoch, ex.toString());
+                            context.transitionTo(CoordinatorState.FAILED);
+                        }
+                    } else {
+                        log.debug("Failed to complete the loading of metadata 
for {} in epoch {} since the coordinator does not exist.",
+                            tp, epoch);
+                    }
+                });
+            });
+        }
+
         /**
          * Unloads the coordinator.
          */
@@ -583,11 +715,352 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             }
             timer.cancelAll();
             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+            failCurrentBatch(Errors.NOT_COORDINATOR.exception());
             if (coordinator != null) {
                 coordinator.onUnloaded();
             }
             coordinator = null;
         }
+
+        /**
+         * Frees the current batch.
+         */
+        private void freeCurrentBatch() {
+            // Cancel the linger timeout.
+            currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+            // Release the buffer.
+            bufferSupplier.release(currentBatch.buffer);
+
+            currentBatch = null;
+        }
+
+        /**
+         * Flushes the current (or pending) batch to the log. When the batch 
is written
+         * locally, a new snapshot is created in the snapshot registry and the 
events
+         * associated with the batch are added to the deferred event queue.
+         */
+        private void flushCurrentBatch() {
+            if (currentBatch != null) {
+                try {
+                    // Write the records to the log and update the last 
written offset.
+                    long offset = partitionWriter.append(
+                        tp,
+                        currentBatch.verificationGuard,
+                        currentBatch.builder.build()
+                    );
+                    coordinator.updateLastWrittenOffset(offset);
+
+                    if (offset != currentBatch.nextOffset) {
+                        log.error("The state machine of the coordinator {} is 
out of sync with the underlying log. " +
+                            "The last written offset returned is {} while the 
coordinator expected {}. The coordinator " +
+                            "will be reloaded in order to re-synchronize the 
state machine.",
+                            tp, offset, currentBatch.nextOffset);
+                        // Transition to FAILED state to unload the state 
machine and complete
+                        // exceptionally all the pending operations.
+                        transitionTo(CoordinatorState.FAILED);
+                        // Transition to LOADING to trigger the restoration of 
the state.
+                        transitionTo(CoordinatorState.LOADING);
+                        // Thrown NotCoordinatorException to fail the 
operation that
+                        // triggered the write. We use NotCoordinatorException 
to be
+                        // consistent with the transition to FAILED.
+                        throw Errors.NOT_COORDINATOR.exception();
+                    }
+
+                    // Add all the pending deferred events to the deferred 
event queue.
+                    for (DeferredEvent event : currentBatch.deferredEvents) {
+                        deferredEventQueue.add(offset, event);
+                    }
+
+                    // Free up the current batch.
+                    freeCurrentBatch();
+                } catch (Throwable t) {
+                    log.error("Writing records to {} failed due to: {}.", tp, 
t.getMessage());
+                    failCurrentBatch(t);
+                    // We rethrow the exception for the caller to handle it 
too.
+                    throw t;
+                }
+            }
+        }
+
+        /**
+         * Flushes the current batch if it is transactional or if it has 
passed the append linger time.
+         */
+        private void maybeFlushCurrentBatch(long currentTimeMs) {
+            if (currentBatch != null) {
+                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+                    flushCurrentBatch();
+                }
+            }
+        }
+
+        /**
+         * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+         * batch, fails all the associated events.
+         */
+        private void failCurrentBatch(Throwable t) {
+            if (currentBatch != null) {
+                coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+                for (DeferredEvent event : currentBatch.deferredEvents) {
+                    event.complete(t);
+                }
+                freeCurrentBatch();
+            }
+        }
+
+        /**
+         * Allocates a new batch if none already exists.
+         */
+        private void maybeAllocateNewBatch(
+            long producerId,
+            short producerEpoch,
+            VerificationGuard verificationGuard,
+            long currentTimeMs
+        ) {
+            if (currentBatch == null) {
+                LogConfig logConfig = partitionWriter.config(tp);
+                byte magic = logConfig.recordVersion().value;
+                int maxBatchSize = logConfig.maxMessageSize();
+                long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+                ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+                MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+                    buffer,
+                    magic,
+                    compression,
+                    TimestampType.CREATE_TIME,
+                    0L,
+                    currentTimeMs,
+                    producerId,
+                    producerEpoch,
+                    0,
+                    producerId != RecordBatch.NO_PRODUCER_ID,
+                    false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                    maxBatchSize
+                );
+
+                Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+                if (appendLingerMs > 0) {
+                    lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+                        @Override
+                        public void run() {
+                            // An event to flush the batch is pushed to the 
front of the queue
+                            // to ensure that the linger time is respected.
+                            enqueueFirst(new 
CoordinatorInternalEvent("FlushBatch", tp, () -> {
+                                if (this.isCancelled()) return;
+                                withActiveContextOrThrow(tp, 
CoordinatorContext::flushCurrentBatch);
+                            }));
+                        }
+                    });
+                    CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
+                }
+
+                currentBatch = new CoordinatorBatch(
+                    prevLastWrittenOffset,
+                    currentTimeMs,
+                    maxBatchSize,
+                    verificationGuard,
+                    buffer,
+                    builder,
+                    lingerTimeoutTask
+                );
+            }
+        }
+
+        /**
+         * Appends records to the log and replay them to the state machine.
+         *
+         * @param producerId        The producer id.
+         * @param producerEpoch     The producer epoch.
+         * @param verificationGuard The verification guard.
+         * @param records           The records to append.
+         * @param replay            A boolean indicating whether the records
+         *                          must be replayed or not.
+         * @param event             The event that must be completed when the
+         *                          records are written.
+         */
+        private void append(
+            long producerId,
+            short producerEpoch,
+            VerificationGuard verificationGuard,
+            List<U> records,
+            boolean replay,
+            DeferredEvent event
+        ) {
+            if (state != CoordinatorState.ACTIVE) {
+                throw new IllegalStateException("Coordinator must be active to 
append records");
+            }
+
+            if (records.isEmpty()) {
+                // If the records are empty, it was a read operation after 
all. In this case,
+                // the response can be returned directly iff there are no 
pending write operations;
+                // otherwise, the read needs to wait on the last write 
operation to be completed.
+                if (currentBatch != null) {
+                    currentBatch.deferredEvents.add(event);
+                } else {
+                    OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
+                    if (pendingOffset.isPresent()) {
+                        deferredEventQueue.add(pendingOffset.getAsLong(), 
event);
+                    } else {
+                        event.complete(null);
+                    }
+                }
+            } else {
+                // If the records are not empty, first, they are applied to 
the state machine,
+                // second, they are appended to the opened batch.
+                long currentTimeMs = time.milliseconds();
+
+                // If the current write operation is transactional, the 
current batch
+                // is written before proceeding with it.
+                if (producerId != RecordBatch.NO_PRODUCER_ID) {
+                    // If flushing fails, we don't catch the exception in 
order to let
+                    // the caller fail the current operation.
+                    flushCurrentBatch();
+                }
+
+                // Allocate a new batch if none exists.
+                maybeAllocateNewBatch(
+                    producerId,
+                    producerEpoch,
+                    verificationGuard,
+                    currentTimeMs
+                );
+
+                // Prepare the records.
+                List<SimpleRecord> recordsToAppend = new 
ArrayList<>(records.size());
+                for (U record : records) {
+                    recordsToAppend.add(new SimpleRecord(
+                        currentTimeMs,
+                        serializer.serializeKey(record),
+                        serializer.serializeValue(record)
+                    ));
+                }
+
+                // Compute the estimated size of the records.
+                int estimatedSize = AbstractRecords.estimateSizeInBytes(
+                    currentBatch.builder.magic(),
+                    compression.type(),
+                    recordsToAppend
+                );
+
+                // Check if the current batch has enough space. We check is 
before
+                // replaying the records in order to avoid having to revert 
back
+                // changes if the records do not fit within a batch.
+                if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
+                    throw new RecordTooLargeException("Message batch size is " 
+ estimatedSize +
+                        " bytes in append to partition " + tp + " which 
exceeds the maximum " +
+                        "configured size of " + currentBatch.maxBatchSize + 
".");
+                }
+
+                if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                    // Otherwise, we write the current batch, allocate a new 
one and re-verify
+                    // whether the records fit in it.
+                    // If flushing fails, we don't catch the exception in 
order to let
+                    // the caller fail the current operation.
+                    flushCurrentBatch();
+                    maybeAllocateNewBatch(
+                        producerId,
+                        producerEpoch,
+                        verificationGuard,
+                        currentTimeMs
+                    );
+                }
+
+                // Add the event to the list of pending events associated with 
the batch.
+                currentBatch.deferredEvents.add(event);
+
+                try {
+                    // Apply record to the state machine.
+                    if (replay) {
+                        for (int i = 0; i < records.size(); i++) {
+                            // We compute the offset of the record based on 
the last written offset. The
+                            // coordinator is the single writer to the 
underlying partition so we can
+                            // deduce it like this.
+                            coordinator.replay(
+                                currentBatch.nextOffset + i,
+                                producerId,
+                                producerEpoch,
+                                records.get(i)
+                            );
+                        }
+                    }
+
+                    // Append to the batch.
+                    for (SimpleRecord record : recordsToAppend) {
+                        currentBatch.builder.append(record);
+                        currentBatch.nextOffset++;
+                    }
+                } catch (Throwable t) {
+                    log.error("Replaying records to {} failed due to: {}.", 
tp, t.getMessage());
+                    // If an exception is thrown, we fail the entire batch. 
Exceptions should be
+                    // really exceptional in this code path and they would 
usually be the results
+                    // of bugs preventing records to be replayed.
+                    failCurrentBatch(t);
+                }
+
+                // Write the current batch if it is transactional or if the 
linger timeout
+                // has expired.
+                // If flushing fails, we don't catch the exception in order to 
let
+                // the caller fail the current operation.
+                maybeFlushCurrentBatch(currentTimeMs);
+            }
+        }
+
+        /**
+         * Completes a transaction.
+         *
+         * @param producerId        The producer id.
+         * @param producerEpoch     The producer epoch.
+         * @param coordinatorEpoch  The coordinator epoch of the transaction 
coordinator.
+         * @param result            The transaction result.
+         * @param event             The event that must be completed when the
+         *                          control record is written.
+         */
+        private void completeTransaction(
+            long producerId,
+            short producerEpoch,
+            int coordinatorEpoch,
+            TransactionResult result,
+            DeferredEvent event
+        ) {
+            if (state != CoordinatorState.ACTIVE) {
+                throw new IllegalStateException("Coordinator must be active to 
complete a transaction");
+            }
+
+            // The current batch must be written before the transaction marker 
is written
+            // in order to respect the order.
+            flushCurrentBatch();
+
+            long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+            try {
+                coordinator.replayEndTransactionMarker(
+                    producerId,
+                    producerEpoch,
+                    result
+                );
+
+                long offset = partitionWriter.append(
+                    tp,
+                    VerificationGuard.SENTINEL,
+                    MemoryRecords.withEndTransactionMarker(
+                        time.milliseconds(),
+                        producerId,
+                        producerEpoch,
+                        new EndTransactionMarker(
+                            result == TransactionResult.COMMIT ? 
ControlRecordType.COMMIT : ControlRecordType.ABORT,
+                            coordinatorEpoch
+                        )
+                    )
+                );
+                coordinator.updateLastWrittenOffset(offset);
+
+                deferredEventQueue.add(offset, event);
+            } catch (Throwable t) {
+                coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
+                event.complete(t);
+            }
+        }
     }
 
     class OperationTimeout extends TimerTask {
@@ -781,100 +1254,20 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                     // Execute the operation.
                     result = 
op.generateRecordsAndResult(context.coordinator.coordinator());
 
-                    if (result.records().isEmpty()) {
-                        // If the records are empty, it was a read operation 
after all. In this case,
-                        // the response can be returned directly iff there are 
no pending write operations;
-                        // otherwise, the read needs to wait on the last write 
operation to be completed.
-                        OptionalLong pendingOffset = 
context.deferredEventQueue.highestPendingOffset();
-                        if (pendingOffset.isPresent()) {
-                            
context.deferredEventQueue.add(pendingOffset.getAsLong(), this);
-                        } else {
-                            complete(null);
-                        }
-                    } else {
-                        // If the records are not empty, first, they are 
applied to the state machine,
-                        // second, then are written to the partition/log, and 
finally, the response
-                        // is put into the deferred event queue.
-                        long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
-                        LogConfig logConfig = partitionWriter.config(tp);
-                        byte magic = logConfig.recordVersion().value;
-                        int maxBatchSize = logConfig.maxMessageSize();
-                        long currentTimeMs = time.milliseconds();
-                        ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize));
-
-                        try {
-                            MemoryRecordsBuilder builder = new 
MemoryRecordsBuilder(
-                                buffer,
-                                magic,
-                                compression,
-                                TimestampType.CREATE_TIME,
-                                0L,
-                                currentTimeMs,
-                                producerId,
-                                producerEpoch,
-                                0,
-                                producerId != RecordBatch.NO_PRODUCER_ID,
-                                false,
-                                RecordBatch.NO_PARTITION_LEADER_EPOCH,
-                                maxBatchSize
-                            );
-
-                            // Apply the records to the state machine and add 
them to the batch.
-                            for (int i = 0; i < result.records().size(); i++) {
-                                U record = result.records().get(i);
-
-                                if (result.replayRecords()) {
-                                    // We compute the offset of the record 
based on the last written offset. The
-                                    // coordinator is the single writer to the 
underlying partition so we can
-                                    // deduce it like this.
-                                    context.coordinator.replay(
-                                        prevLastWrittenOffset + i,
-                                        producerId,
-                                        producerEpoch,
-                                        record
-                                    );
-                                }
-
-                                byte[] keyBytes = 
serializer.serializeKey(record);
-                                byte[] valBytes = 
serializer.serializeValue(record);
-
-                                if (builder.hasRoomFor(currentTimeMs, 
keyBytes, valBytes, EMPTY_HEADERS)) {
-                                    builder.append(
-                                        currentTimeMs,
-                                        keyBytes,
-                                        valBytes,
-                                        EMPTY_HEADERS
-                                    );
-                                } else {
-                                    throw new RecordTooLargeException("Message 
batch size is " + builder.estimatedSizeInBytes() +
-                                        " bytes in append to partition " + tp 
+ " which exceeds the maximum " +
-                                        "configured size of " + maxBatchSize + 
".");
-                                }
-                            }
-
-                            // Write the records to the log and update the 
last written
-                            // offset.
-                            long offset = partitionWriter.append(
-                                tp,
-                                verificationGuard,
-                                builder.build()
-                            );
-                            
context.coordinator.updateLastWrittenOffset(offset);
+                    // Append the records and replay them to the state machine.
+                    context.append(
+                        producerId,
+                        producerEpoch,
+                        verificationGuard,
+                        result.records(),
+                        result.replayRecords(),
+                        this
+                    );
 
-                            // Add the response to the deferred queue.
-                            if (!future.isDone()) {
-                                context.deferredEventQueue.add(offset, this);
-                                operationTimeout = new OperationTimeout(tp, 
this, writeTimeout.toMillis());
-                                timer.add(operationTimeout);
-                            } else {
-                                complete(null);
-                            }
-                        } catch (Throwable t) {
-                            
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
-                            complete(t);
-                        } finally {
-                            context.bufferSupplier.release(buffer);
-                        }
+                    // If the operation is not done, create an operation 
timeout.
+                    if (!future.isDone()) {
+                        operationTimeout = new OperationTimeout(tp, this, 
writeTimeout.toMillis());
+                        timer.add(operationTimeout);
                     }
                 });
             } catch (Throwable t) {
@@ -1142,40 +1535,17 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         public void run() {
             try {
                 withActiveContextOrThrow(tp, context -> {
-                    long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+                    context.completeTransaction(
+                        producerId,
+                        producerEpoch,
+                        coordinatorEpoch,
+                        result,
+                        this
+                    );
 
-                    try {
-                        context.coordinator.replayEndTransactionMarker(
-                            producerId,
-                            producerEpoch,
-                            result
-                        );
-
-                        long offset = partitionWriter.append(
-                            tp,
-                            VerificationGuard.SENTINEL,
-                            MemoryRecords.withEndTransactionMarker(
-                                time.milliseconds(),
-                                producerId,
-                                producerEpoch,
-                                new EndTransactionMarker(
-                                    result == TransactionResult.COMMIT ? 
ControlRecordType.COMMIT : ControlRecordType.ABORT,
-                                    coordinatorEpoch
-                                )
-                            )
-                        );
-                        context.coordinator.updateLastWrittenOffset(offset);
-
-                        if (!future.isDone()) {
-                            context.deferredEventQueue.add(offset, this);
-                            operationTimeout = new OperationTimeout(tp, this, 
writeTimeout.toMillis());
-                            timer.add(operationTimeout);
-                        } else {
-                            complete(null);
-                        }
-                    } catch (Throwable t) {
-                        
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
-                        complete(t);
+                    if (!future.isDone()) {
+                        operationTimeout = new OperationTimeout(tp, this, 
writeTimeout.toMillis());
+                        timer.add(operationTimeout);
                     }
                 });
             } catch (Throwable t) {
@@ -1449,6 +1819,12 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      */
     private final Compression compression;
 
+    /**
+     * The duration in milliseconds that the coordinator will wait for writes 
to
+     * accumulate before flushing them to disk.
+     */
+    private final int appendLingerMs;
+
     /**
      * Atomic boolean indicating whether the runtime is running.
      */
@@ -1475,7 +1851,9 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      * @param coordinatorMetrics                The coordinator metrics.
      * @param serializer                        The serializer.
      * @param compression                       The compression codec.
+     * @param appendLingerMs                    The append linger time in ms.
      */
+    @SuppressWarnings("checkstyle:ParameterNumber")
     private CoordinatorRuntime(
         String logPrefix,
         LogContext logContext,
@@ -1489,7 +1867,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         CoordinatorRuntimeMetrics runtimeMetrics,
         CoordinatorMetrics coordinatorMetrics,
         Serializer<U> serializer,
-        Compression compression
+        Compression compression,
+        int appendLingerMs
     ) {
         this.logPrefix = logPrefix;
         this.logContext = logContext;
@@ -1506,6 +1885,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         this.coordinatorMetrics = coordinatorMetrics;
         this.serializer = serializer;
         this.compression = compression;
+        this.appendLingerMs = appendLingerMs;
     }
 
     /**
@@ -1836,36 +2216,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                         case FAILED:
                         case INITIAL:
                             context.transitionTo(CoordinatorState.LOADING);
-                            loader.load(tp, 
context.coordinator).whenComplete((summary, exception) -> {
-                                scheduleInternalOperation("CompleteLoad(tp=" + 
tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
-                                    CoordinatorContext ctx = 
coordinators.get(tp);
-                                    if (ctx != null)  {
-                                        if (ctx.state != 
CoordinatorState.LOADING) {
-                                            log.info("Ignored load completion 
from {} because context is in {} state.",
-                                                ctx.tp, ctx.state);
-                                            return;
-                                        }
-                                        try {
-                                            if (exception != null) throw 
exception;
-                                            
ctx.transitionTo(CoordinatorState.ACTIVE);
-                                            if (summary != null) {
-                                                
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), 
summary.endTimeMs());
-                                                log.info("Finished loading of 
metadata from {} with epoch {} in {}ms where {}ms " +
-                                                         "was spent in the 
scheduler. Loaded {} records which total to {} bytes.",
-                                                    tp, partitionEpoch, 
summary.endTimeMs() - summary.startTimeMs(),
-                                                    
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes());
-                                            }
-                                        } catch (Throwable ex) {
-                                            log.error("Failed to load metadata 
from {} with epoch {} due to {}.",
-                                                tp, partitionEpoch, 
ex.toString());
-                                            
ctx.transitionTo(CoordinatorState.FAILED);
-                                        }
-                                    } else {
-                                        log.debug("Failed to complete the 
loading of metadata for {} in epoch {} since the coordinator does not exist.",
-                                            tp, partitionEpoch);
-                                    }
-                                });
-                            });
                             break;
 
                         case LOADING:
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 03306c90407..b65ceda74ed 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -30,6 +30,7 @@ public class GroupCoordinatorConfigTest {
     public void testConfigs() {
         ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
         GroupCoordinatorConfig config = new GroupCoordinatorConfig(
+            10,
             10,
             30,
             10,
@@ -65,6 +66,7 @@ public class GroupCoordinatorConfigTest {
         assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
         assertEquals(5000, config.offsetCommitTimeoutMs);
         assertEquals(CompressionType.GZIP, config.compressionType);
+        assertEquals(10, config.appendLingerMs);
     }
 
     public static GroupCoordinatorConfig createGroupCoordinatorConfig(
@@ -74,6 +76,7 @@ public class GroupCoordinatorConfigTest {
     ) {
         return new GroupCoordinatorConfig(
             1,
+            10,
             45,
             5,
             Integer.MAX_VALUE,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 6cd96458c64..7ddb04a6d8b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -118,6 +118,7 @@ public class GroupCoordinatorServiceTest {
     private GroupCoordinatorConfig createConfig() {
         return new GroupCoordinatorConfig(
             1,
+            10,
             45,
             5,
             Integer.MAX_VALUE,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index ae1d4047924..5052881d3c0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.AbstractRecords;
@@ -60,8 +61,8 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Deque;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
@@ -74,6 +75,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE;
@@ -85,6 +87,8 @@ import static 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -147,7 +151,7 @@ public class CoordinatorRuntimeTest {
      * when poll() is called.
      */
     private static class ManualEventProcessor implements 
CoordinatorEventProcessor {
-        private Deque<CoordinatorEvent> queue = new LinkedList<>();
+        private final Deque<CoordinatorEvent> queue = new LinkedList<>();
 
         @Override
         public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
@@ -274,9 +278,72 @@ public class CoordinatorRuntimeTest {
      * A simple Coordinator implementation that stores the records into a set.
      */
     static class MockCoordinatorShard implements CoordinatorShard<String> {
+        static class RecordAndMetadata {
+            public final long offset;
+            public final long producerId;
+            public final short producerEpoch;
+            public final String record;
+
+            public RecordAndMetadata(
+                long offset,
+                String record
+            ) {
+                this(
+                    offset,
+                    RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH,
+                    record
+                );
+            }
+
+            public RecordAndMetadata(
+                long offset,
+                long producerId,
+                short producerEpoch,
+                String record
+            ) {
+                this.offset = offset;
+                this.producerId = producerId;
+                this.producerEpoch = producerEpoch;
+                this.record = record;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) return true;
+                if (o == null || getClass() != o.getClass()) return false;
+
+                RecordAndMetadata that = (RecordAndMetadata) o;
+
+                if (offset != that.offset) return false;
+                if (producerId != that.producerId) return false;
+                if (producerEpoch != that.producerEpoch) return false;
+                return Objects.equals(record, that.record);
+            }
+
+            @Override
+            public int hashCode() {
+                int result = (int) (offset ^ (offset >>> 32));
+                result = 31 * result + (int) (producerId ^ (producerId >>> 
32));
+                result = 31 * result + (int) producerEpoch;
+                result = 31 * result + (record != null ? record.hashCode() : 
0);
+                return result;
+            }
+
+            @Override
+            public String toString() {
+                return "RecordAndMetadata(" +
+                    "offset=" + offset +
+                    ", producerId=" + producerId +
+                    ", producerEpoch=" + producerEpoch +
+                    ", record='" + record.substring(0, 10) + '\'' +
+                    ')';
+            }
+        }
+
         private final SnapshotRegistry snapshotRegistry;
-        private final TimelineHashSet<String> records;
-        private final TimelineHashMap<Long, TimelineHashSet<String>> 
pendingRecords;
+        private final TimelineHashSet<RecordAndMetadata> records;
+        private final TimelineHashMap<Long, 
TimelineHashSet<RecordAndMetadata>> pendingRecords;
         private final CoordinatorTimer<Void, String> timer;
 
         MockCoordinatorShard(
@@ -296,12 +363,19 @@ public class CoordinatorRuntimeTest {
             short producerEpoch,
             String record
         ) throws RuntimeException {
+            RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
+                offset,
+                producerId,
+                producerEpoch,
+                record
+            );
+
             if (producerId == RecordBatch.NO_PRODUCER_ID) {
-                records.add(record);
+                records.add(recordAndMetadata);
             } else {
                 pendingRecords
                     .computeIfAbsent(producerId, __ -> new 
TimelineHashSet<>(snapshotRegistry, 0))
-                    .add(record);
+                    .add(recordAndMetadata);
             }
         }
 
@@ -312,7 +386,7 @@ public class CoordinatorRuntimeTest {
             TransactionResult result
         ) throws RuntimeException {
             if (result == TransactionResult.COMMIT) {
-                TimelineHashSet<String> pending = 
pendingRecords.remove(producerId);
+                TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.remove(producerId);
                 if (pending == null) return;
                 records.addAll(pending);
             } else {
@@ -321,13 +395,26 @@ public class CoordinatorRuntimeTest {
         }
 
         Set<String> pendingRecords(long producerId) {
-            TimelineHashSet<String> pending = pendingRecords.get(producerId);
+            TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.get(producerId);
             if (pending == null) return Collections.emptySet();
-            return Collections.unmodifiableSet(new HashSet<>(pending));
+            return Collections.unmodifiableSet(
+                pending.stream().map(record -> 
record.record).collect(Collectors.toSet())
+            );
         }
 
         Set<String> records() {
-            return Collections.unmodifiableSet(new HashSet<>(records));
+            return Collections.unmodifiableSet(
+                records.stream().map(record -> 
record.record).collect(Collectors.toSet())
+            );
+        }
+
+        List<RecordAndMetadata> fullRecords() {
+            return Collections.unmodifiableList(
+                records
+                    .stream()
+                    .sorted(Comparator.comparingLong(record -> record.offset))
+                    .collect(Collectors.toList())
+            );
         }
 
         CoordinatorTimer<Void, String> timer() {
@@ -407,10 +494,17 @@ public class CoordinatorRuntimeTest {
         long timestamp,
         String... records
     ) {
-        if (records.length == 0)
+        return records(timestamp, 
Arrays.stream(records).collect(Collectors.toList()));
+    }
+
+    private static MemoryRecords records(
+        long timestamp,
+        List<String> records
+    ) {
+        if (records.isEmpty())
             return MemoryRecords.EMPTY;
 
-        List<SimpleRecord> simpleRecords = Arrays.stream(records).map(record ->
+        List<SimpleRecord> simpleRecords = records.stream().map(record ->
             new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
         ).collect(Collectors.toList());
 
@@ -447,10 +541,24 @@ public class CoordinatorRuntimeTest {
         long timestamp,
         String... records
     ) {
-        if (records.length == 0)
+        return transactionalRecords(
+            producerId,
+            producerEpoch,
+            timestamp,
+            Arrays.stream(records).collect(Collectors.toList())
+        );
+    }
+
+    private static MemoryRecords transactionalRecords(
+        long producerId,
+        short producerEpoch,
+        long timestamp,
+        List<String> records
+    ) {
+        if (records.isEmpty())
             return MemoryRecords.EMPTY;
 
-        List<SimpleRecord> simpleRecords = Arrays.stream(records).map(record ->
+        List<SimpleRecord> simpleRecords = records.stream().map(record ->
             new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
         ).collect(Collectors.toList());
 
@@ -986,13 +1094,13 @@ public class CoordinatorRuntimeTest {
         // Records have been replayed to the coordinator.
         assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
         // Records have been written to the log.
-        assertEquals(Arrays.asList(
+        assertEquals(Collections.singletonList(
             records(timer.time().milliseconds(), "record1", "record2")
         ), writer.entries(TP));
 
         // Write #2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record3"), 
"response2"));
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record3"), "response2"));
 
         // Verify that the write is not committed yet.
         assertFalse(write2.isDone());
@@ -1540,7 +1648,7 @@ public class CoordinatorRuntimeTest {
             100L
         ));
         // Records have been written to the log.
-        assertEquals(Arrays.asList(
+        assertEquals(Collections.singletonList(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
 
@@ -1785,7 +1893,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
-        assertEquals(Arrays.asList(
+        assertEquals(Collections.singletonList(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
 
@@ -1807,7 +1915,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
-        assertEquals(Arrays.asList(
+        assertEquals(Collections.singletonList(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
     }
@@ -1985,7 +2093,7 @@ public class CoordinatorRuntimeTest {
         // Read.
         List<CompletableFuture<List<String>>> responses = 
runtime.scheduleReadAllOperation(
             "read",
-            (state, offset) -> new ArrayList<>(state.records)
+            (state, offset) -> new ArrayList<>(state.records())
         );
 
         assertEquals(
@@ -3059,6 +3167,594 @@ public class CoordinatorRuntimeTest {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.singletonList(
+            records(timer.time().milliseconds(), records.subList(0, 3))
+        ), writer.entries(TP));
+
+        // Advance past the linger time.
+        timer.advanceClock(11);
+
+        // Verify the state. The pending batch is flushed.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+            records(timer.time().milliseconds() - 11, records.subList(3, 4))
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write all the records.
+        CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        assertFutureThrows(write, RecordTooLargeException.class);
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept no writes.
+        MockPartitionWriter writer = new MockPartitionWriter(0);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response3"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #4. This write cannot make it in the current batch. So the 
current batch
+        // is flushed. It will fail. So we expect all writes to fail.
+        CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response4"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, KafkaException.class);
+        assertFutureThrows(write2, KafkaException.class);
+        assertFutureThrows(write3, KafkaException.class);
+        // Write #4 is also expected to fail.
+        assertFutureThrows(write4, KafkaException.class);
+
+        // Verify the state. The state should be reverted to the initial state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Override the coordinator with a coordinator that throws
+        // an exception when replay is called.
+        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
+        ctx.coordinator = new SnapshottableCoordinator<>(
+            new LogContext(),
+            snapshotRegistry,
+            new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
+                @Override
+                public void replay(
+                    long offset,
+                    long producerId,
+                    short producerEpoch,
+                    String record
+                ) throws RuntimeException {
+                    if (offset >= 1) {
+                        throw new IllegalArgumentException("error");
+                    }
+                    super.replay(
+                        offset,
+                        producerId,
+                        producerEpoch,
+                        record
+                    );
+                }
+            },
+            TP
+        );
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.singletonList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2. It should fail.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, IllegalArgumentException.class);
+        assertFutureThrows(write2, IllegalArgumentException.class);
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleTransactionalWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Write #1 with one record.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Transactional write #2 with one record. This will flush the current 
batch.
+        CompletableFuture<String> write2 = 
runtime.scheduleTransactionalWriteOperation(
+            "txn-write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 50,
+            Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
+            TXN_OFFSET_COMMIT_LATEST_VERSION
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. The current batch and the transactional records 
are
+        // written to the log.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(mkSet("record#2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), "record#1"),
+            transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2")
+        ), writer.entries(TP));
+
+        // Write #3 with one record.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#3"), "response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(mkSet("record#2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1", "record#3"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), "record#1"),
+            transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2")
+        ), writer.entries(TP));
+
+        // Complete transaction #1. It will flush the current batch if any.
+        CompletableFuture<Void> complete1 = 
runtime.scheduleTransactionCompletion(
+            "complete#1",
+            TP,
+            100L,
+            (short) 50,
+            10,
+            TransactionResult.COMMIT,
+            DEFAULT_WRITE_TIMEOUT
+        );
+
+        // Verify that the completion is not committed yet.
+        assertFalse(complete1.isDone());
+
+        // Verify the state.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1", "record#2", "record#3"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), "record#1"),
+            transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2"),
+            records(timer.time().milliseconds(), "record#3"),
+            endTransactionMarker(100L, (short) 50, 
timer.time().milliseconds(), 10, ControlRecordType.COMMIT)
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertTrue(complete1.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+        assertNull(complete1.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testStateMachineIsReloadedWhenOutOfSync() {
+        MockTimer timer = new MockTimer();
+        MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
+        MockPartitionWriter writer = new MockPartitionWriter() {
+            @Override
+            public long append(
+                TopicPartition tp,
+                VerificationGuard verificationGuard,
+                MemoryRecords batch
+            ) {
+                // Add 1 to the returned offsets.
+                return super.append(tp, verificationGuard, batch) + 1;
+            }
+        };
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(loader)
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(ACTIVE, ctx.state);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Keep a reference to the current coordinator.
+        SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = 
ctx.coordinator;
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response3"));
+
+        // Write #4. This write cannot make it in the current batch. So the 
current batch
+        // is flushed. It will fail. So we expect all writes to fail.
+        CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response4"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, NotCoordinatorException.class);
+        assertFutureThrows(write2, NotCoordinatorException.class);
+        assertFutureThrows(write3, NotCoordinatorException.class);
+        // Write #4 is also expected to fail.
+        assertFutureThrows(write4, NotCoordinatorException.class);
+
+        // Verify that the state machine was loaded twice.
+        verify(loader, times(2)).load(eq(TP), any());
+
+        // Verify that the state is active and that the state machine
+        // is actually a new one.
+        assertEquals(ACTIVE, ctx.state);
+        assertNotEquals(coordinator, ctx.coordinator);
+    }
+
     private static <S extends CoordinatorShard<U>, U> 
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
         CoordinatorRuntime<S, U> runtime,
         TopicPartition tp
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
index ec6e8b3d783..ac58b68ec5e 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
@@ -32,6 +32,10 @@ public abstract class TimerTask implements Runnable {
         }
     }
 
+    public boolean isCancelled() {
+        return timerTaskEntry == null;
+    }
+
     final void setTimerTaskEntry(TimerTaskEntry entry) {
         synchronized (this) {
             // if this timerTask is already held by an existing timer task 
entry,


Reply via email to