This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new adc16d0f310 KAFKA-14538: Implement KRaft metadata transactions in 
QuorumController
adc16d0f310 is described below

commit adc16d0f310dc1350bca66d5da013d599b990cfa
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Mon Aug 14 16:58:56 2023 -0700

    KAFKA-14538: Implement KRaft metadata transactions in QuorumController
    
    Implement the QuorumController side of KRaft metadata transactions.
    
    As specified in KIP-868, this PR creates a new metadata version, 
IBP_3_6_IV1, which contains the
    three new records: AbortTransactionRecord, BeginTransactionRecord, 
EndTransactionRecord.
    
    In order to make offset management unit-testable, this PR moves it out of 
QuorumController.java and
    into OffsetControlManager.java. The general approach here is to track the 
"last stable offset," which is
    calculated by looking at the latest committed offset and the in-progress 
transaction (if any). When
    a transaction is aborted, we revert back to this last stable offset. We 
also revert back to it when
    the controller is transitioning from active to inactive.
    
    In a follow-up PR, we will add support for the transaction records in 
MetadataLoader. We will also
    add support for automatically aborting pending transactions after a 
controller failover.
    
    Reviewers: David Arthur <[email protected]>
---
 .../java/kafka/test/ClusterTestExtensionsTest.java |   2 +-
 .../java/kafka/test/annotation/ClusterTest.java    |   2 +-
 .../kafka/controller/OffsetControlManager.java     | 421 +++++++++++++++++++++
 .../apache/kafka/controller/QuorumController.java  | 219 ++++-------
 .../common/metadata/AbortTransactionRecord.json    |  27 ++
 .../common/metadata/BeginTransactionRecord.json    |  27 ++
 .../common/metadata/EndTransactionRecord.json      |  24 ++
 .../kafka/controller/OffsetControlManagerTest.java | 270 +++++++++++++
 .../QuorumControllerIntegrationTestUtils.java      |   2 +-
 .../QuorumControllerMetricsIntegrationTest.java    |   2 +-
 .../kafka/controller/QuorumControllerTest.java     |   6 +-
 .../kafka/server/common/MetadataVersion.java       |   9 +-
 .../apache/kafka/timeline/SnapshotRegistry.java    |   1 +
 .../kafka/timeline/TrackingSnapshotRegistry.java   |  56 +++
 .../org/apache/kafka/tools/FeatureCommandTest.java |   4 +-
 15 files changed, 913 insertions(+), 159 deletions(-)

diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index c20e0d889b6..a52e51fe112 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
 
     @ClusterTest
     public void testDefaults(ClusterConfig config) {
-        Assertions.assertEquals(MetadataVersion.IBP_3_6_IV0, 
config.metadataVersion());
+        Assertions.assertEquals(MetadataVersion.IBP_3_6_IV1, 
config.metadataVersion());
     }
 }
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java 
b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index db6671a0f63..cd9161ce923 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
     String name() default "";
     SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
     String listener() default "";
-    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV0;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV1;
     ClusterConfigProperty[] serverProperties() default {};
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
new file mode 100644
index 00000000000..76bae6ada07
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.Snapshots;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+
+/**
+ * Manages read and write offsets, and in-memory snapshots.
+ *
+ * Also manages the following metrics:
+ *      kafka.controller:type=KafkaController,name=ActiveControllerCount
+ *      kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
+ *      kafka.controller:type=KafkaController,name=LastAppliedRecordOffset
+ *      kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp
+ *      kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
+ */
+class OffsetControlManager {
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private QuorumControllerMetrics metrics = null;
+        private Time time = Time.SYSTEM;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setMetrics(QuorumControllerMetrics metrics) {
+            this.metrics = metrics;
+            return this;
+        }
+
+        Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public OffsetControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+            if (metrics == null) {
+                metrics = new QuorumControllerMetrics(Optional.empty(), time, 
false);
+            }
+            return new OffsetControlManager(logContext,
+                    snapshotRegistry,
+                    metrics,
+                    time);
+        }
+    }
+
+    /**
+     * The slf4j logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The quorum controller metrics.
+     */
+    private final QuorumControllerMetrics metrics;
+
+    /**
+     * The clock.
+     */
+    private final Time time;
+
+    /**
+     * The ID of the snapshot that we're currently replaying, or null if there 
is none.
+     */
+    private OffsetAndEpoch currentSnapshotId;
+
+    /**
+     * The name of the snapshot that we're currently replaying, or null if 
there is none.
+     */
+    private String currentSnapshotName;
+
+    /**
+     * The latest committed offset.
+     */
+    private long lastCommittedOffset;
+
+    /**
+     * The latest committed epoch.
+     */
+    private int lastCommittedEpoch;
+
+    /**
+     * The latest offset that it is safe to read from.
+     */
+    private long lastStableOffset;
+
+    /**
+     * The offset of the transaction we're in, or -1 if we are not in one.
+     */
+    private long transactionStartOffset;
+
+    /**
+     * The next offset we should write to, or -1 if the controller is not 
active. Exclusive offset.
+     */
+    private long nextWriteOffset;
+
+    private OffsetControlManager(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        QuorumControllerMetrics metrics,
+        Time time
+    ) {
+        this.log = logContext.logger(OffsetControlManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.metrics = metrics;
+        this.time = time;
+        this.currentSnapshotId = null;
+        this.currentSnapshotName = null;
+        this.lastCommittedOffset = -1L;
+        this.lastCommittedEpoch = -1;
+        this.lastStableOffset = -1L;
+        this.transactionStartOffset = -1L;
+        this.nextWriteOffset = -1L;
+        snapshotRegistry.getOrCreateSnapshot(-1L);
+        metrics.setActive(false);
+        metrics.setLastCommittedRecordOffset(-1L);
+        metrics.setLastAppliedRecordOffset(-1L);
+        metrics.setLastAppliedRecordTimestamp(-1L);
+    }
+
+    /**
+     *  @return The SnapshotRegistry used by this offset control manager.
+     */
+    SnapshotRegistry snapshotRegistry() {
+        return snapshotRegistry;
+    }
+
+    /**
+     * @return QuorumControllerMetrics managed by this offset control manager.
+     */
+    QuorumControllerMetrics metrics() {
+        return metrics;
+    }
+
+    /**
+     * @return the ID of the current snapshot.
+     */
+    OffsetAndEpoch currentSnapshotId() {
+        return currentSnapshotId;
+    }
+
+    /**
+     * @return the name of the current snapshot.
+     */
+    String currentSnapshotName() {
+        return currentSnapshotName;
+    }
+
+    /**
+     * @return the last committed offset.
+     */
+    long lastCommittedOffset() {
+        return lastCommittedOffset;
+    }
+
+    /**
+     * @return the last committed epoch.
+     */
+    int lastCommittedEpoch() {
+        return lastCommittedEpoch;
+    }
+
+    /**
+     * @return the latest offset that it is safe to read from.
+     */
+    long lastStableOffset() {
+        return lastStableOffset;
+    }
+
+    /**
+     * @return the transaction start offset, or -1 if there is no transaction.
+     */
+    long transactionStartOffset() {
+        return transactionStartOffset;
+    }
+
+    /**
+     * @return the next offset that the active controller should write to.
+     */
+    long nextWriteOffset() {
+        return nextWriteOffset;
+    }
+
+    /**
+     * @return true only if the manager is active.
+     */
+    boolean active() {
+        return nextWriteOffset != -1L;
+    }
+
+    /**
+     * Called when the QuorumController becomes active.
+     *
+     * @param newNextWriteOffset The new next write offset to use. Must be 
non-negative.
+     */
+    void activate(long newNextWriteOffset) {
+        if (active()) {
+            throw new RuntimeException("Can't activate already active 
OffsetControlManager.");
+        }
+        if (newNextWriteOffset < 0) {
+            throw new RuntimeException("Invalid negative newNextWriteOffset " +
+                    newNextWriteOffset + ".");
+        }
+        // Before switching to active, create an in-memory snapshot at the 
last committed
+        // offset. This is required because the active controller assumes that 
there is always
+        // an in-memory snapshot at the last committed offset.
+        snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
+        this.nextWriteOffset = newNextWriteOffset;
+        metrics.setActive(true);
+    }
+
+    /**
+     * Called when the QuorumController becomes inactive.
+     */
+    void deactivate() {
+        if (!active()) {
+            throw new RuntimeException("Can't deactivate inactive 
OffsetControlManager.");
+        }
+        metrics.setActive(false);
+        metrics.setLastAppliedRecordOffset(lastStableOffset);
+        this.nextWriteOffset = -1L;
+        if (!snapshotRegistry.hasSnapshot(lastStableOffset)) {
+            throw new RuntimeException("Unable to reset to last stable offset 
" + lastStableOffset +
+                    ". No in-memory snapshot found for this offset.");
+        }
+        snapshotRegistry.revertToSnapshot(lastStableOffset);
+    }
+
+    /**
+     * Handle the callback from the Raft layer indicating that a batch was 
committed.
+     *
+     * @param batch The batch that has been committed.
+     */
+    void handleCommitBatch(Batch<ApiMessageAndVersion> batch) {
+        this.lastCommittedOffset = batch.lastOffset();
+        this.lastCommittedEpoch = batch.epoch();
+        maybeAdvanceLastStableOffset();
+        metrics.setLastCommittedRecordOffset(batch.lastOffset());
+        if (!active()) {
+            // On standby controllers, the last applied record offset is 
equals to the last
+            // committed offset.
+            metrics.setLastAppliedRecordOffset(batch.lastOffset());
+            metrics.setLastAppliedRecordTimestamp(batch.appendTimestamp());
+        }
+    }
+
+    /**
+     * Called by the active controller after it has invoked 
scheduleAtomicAppend to schedule some
+     * records to be written.
+     *
+     * @param endOffset The offset of the last record that was written.
+     */
+    void handleScheduleAtomicAppend(long endOffset) {
+        this.nextWriteOffset = endOffset + 1;
+
+        snapshotRegistry.getOrCreateSnapshot(endOffset);
+
+        metrics.setLastAppliedRecordOffset(endOffset);
+
+        // This is not truly the append timestamp. The KRaft client doesn't 
expose the append
+        // time when scheduling a write. This is good enough because this is 
called right after
+        // the records were given to the KRAft client for appending and the 
default append linger
+        // for KRaft is 25ms.
+        metrics.setLastAppliedRecordTimestamp(time.milliseconds());
+    }
+
+    /**
+     * Advance the last stable offset if needed.
+     */
+    void maybeAdvanceLastStableOffset() {
+        long newLastStableOffset;
+        if (transactionStartOffset == -1L) {
+            newLastStableOffset = lastCommittedOffset;
+        } else {
+            newLastStableOffset = Math.min(transactionStartOffset - 1, 
lastCommittedOffset);
+        }
+        if (lastStableOffset < newLastStableOffset) {
+            lastStableOffset = newLastStableOffset;
+            snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset);
+            if (!active()) {
+                snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
+            }
+        }
+    }
+
+    /**
+     * Called before we load a Raft snapshot.
+     *
+     * @param snapshotId The Raft snapshot offset and epoch.
+     */
+    void beginLoadSnapshot(OffsetAndEpoch snapshotId) {
+        if (currentSnapshotId != null) {
+            throw new RuntimeException("Can't begin reading snapshot for " + 
snapshotId +
+                    ", because we are already reading " + currentSnapshotId);
+        }
+        this.currentSnapshotId = snapshotId;
+        this.currentSnapshotName = 
Snapshots.filenameFromSnapshotId(snapshotId);
+        log.info("Starting to load snapshot {}. Previous lastCommittedOffset 
was {}. Previous " +
+                "transactionStartOffset was {}.", currentSnapshotName, 
lastCommittedOffset,
+                transactionStartOffset);
+        this.snapshotRegistry.reset();
+        this.lastCommittedOffset = -1L;
+        this.lastCommittedEpoch = -1;
+        this.lastStableOffset = -1L;
+        this.transactionStartOffset = -1L;
+        this.nextWriteOffset = -1L;
+    }
+
+    /**
+     * Called after we have finished loading a Raft snapshot.
+     *
+     * @param timestamp The timestamp of the snapshot.
+     */
+    void endLoadSnapshot(long timestamp) {
+        if (currentSnapshotId == null) {
+            throw new RuntimeException("Can't end loading snapshot, because 
there is no " +
+                    "current snapshot.");
+        }
+        log.info("Successfully loaded snapshot {}.", currentSnapshotName);
+        this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset());
+        this.lastCommittedOffset = currentSnapshotId.offset();
+        this.lastCommittedEpoch = currentSnapshotId.epoch();
+        this.lastStableOffset = currentSnapshotId.offset();
+        this.transactionStartOffset = -1L;
+        this.nextWriteOffset = -1L;
+        metrics.setLastCommittedRecordOffset(currentSnapshotId.offset());
+        metrics.setLastAppliedRecordOffset(currentSnapshotId.offset());
+        metrics.setLastAppliedRecordTimestamp(timestamp);
+        this.currentSnapshotId = null;
+        this.currentSnapshotName = null;
+    }
+
+    public void replay(BeginTransactionRecord message, long offset) {
+        if (currentSnapshotId != null) {
+            throw new RuntimeException("BeginTransactionRecord cannot appear 
within a snapshot.");
+        }
+        if (transactionStartOffset != -1L) {
+            throw new RuntimeException("Can't replay a BeginTransactionRecord 
at " + offset +
+                " because the transaction at " + transactionStartOffset + " 
was never closed.");
+        }
+        snapshotRegistry.getOrCreateSnapshot(offset - 1);
+        transactionStartOffset = offset;
+        log.info("Replayed {} at offset {}.", message, offset);
+    }
+
+    public void replay(EndTransactionRecord message, long offset) {
+        if (currentSnapshotId != null) {
+            throw new RuntimeException("EndTransactionRecord cannot appear 
within a snapshot.");
+        }
+        if (transactionStartOffset == -1L) {
+            throw new RuntimeException("Can't replay an EndTransactionRecord 
at " + offset +
+                    " because there is no open transaction.");
+        }
+        transactionStartOffset = -1L;
+        log.info("Replayed {} at offset {}.", message, offset);
+    }
+
+    public void replay(AbortTransactionRecord message, long offset) {
+        if (currentSnapshotId != null) {
+            throw new RuntimeException("AbortTransactionRecord cannot appear 
within a snapshot.");
+        }
+        if (transactionStartOffset == -1L) {
+            throw new RuntimeException("Can't replay an AbortTransactionRecord 
at " + offset +
+                    " because there is no open transaction.");
+        }
+        long preTransactionOffset = transactionStartOffset - 1;
+        snapshotRegistry.revertToSnapshot(preTransactionOffset);
+        transactionStartOffset = -1L;
+        log.info("Replayed {} at offset {}. Reverted to offset {}.",
+                message, offset, preTransactionOffset);
+    }
+
+    // VisibleForTesting
+    void setNextWriteOffset(long newNextWriteOffset) {
+        this.nextWriteOffset = newNextWriteOffset;
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index b658af4c111..2586be0f457 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -49,10 +49,13 @@ import 
org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
 import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
 import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
@@ -104,6 +107,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.fault.FaultHandlerException;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -426,22 +430,6 @@ public final class QuorumController implements Controller {
         return raftClient.leaderAndEpoch().leaderId();
     }
 
-    /**
-     * @return          The offset that we should perform read operations at.
-     */
-    private long currentReadOffset() {
-        if (isActiveController()) {
-            // The active controller keeps an in-memory snapshot at the last 
committed offset,
-            // which we want to read from when performing read operations. 
This will avoid
-            // reading uncommitted data.
-            return lastCommittedOffset;
-        } else {
-            // Standby controllers never have uncommitted data in memory. 
Therefore, we always
-            // read the latest from every data structure.
-            return SnapshotRegistry.LATEST_EPOCH;
-        }
-    }
-
     private void handleEventEnd(String name, long startProcessingTimeNs) {
         long endProcessingTime = time.nanoseconds();
         long deltaNs = endProcessingTime - startProcessingTimeNs;
@@ -467,10 +455,10 @@ public final class QuorumController implements Controller 
{
                 fromInternal(exception, () -> latestController());
         int epoch = curClaimEpoch;
         if (epoch == -1) {
-            epoch = lastCommittedEpoch;
+            epoch = offsetControl.lastCommittedEpoch();
         }
         String failureMessage = info.failureMessage(epoch, deltaUs,
-                isActiveController(), lastCommittedOffset);
+                isActiveController(), offsetControl.lastCommittedOffset());
         if (info.isTimeoutException() && (!deltaUs.isPresent())) {
             controllerMetrics.incrementOperationsTimedOut();
         }
@@ -712,7 +700,7 @@ public final class QuorumController implements Controller {
             }
             ControllerResult<T> result = op.generateRecordsAndResult();
             if (result.records().isEmpty()) {
-                op.processBatchEndOffset(writeOffset);
+                op.processBatchEndOffset(offsetControl.nextWriteOffset() - 1);
                 // If the operation did not return any records, then it was 
actually just
                 // a read after all, and not a read + write.  However, this 
read was done
                 // from the latest in-memory state, which might contain 
uncommitted data.
@@ -736,16 +724,15 @@ public final class QuorumController implements Controller 
{
                 // them to the log.
                 long offset = appendRecords(log, result, maxRecordsPerBatch,
                     new Function<List<ApiMessageAndVersion>, Long>() {
-                        private long prevEndOffset = writeOffset;
-
                         @Override
                         public Long apply(List<ApiMessageAndVersion> records) {
                             // Start by trying to apply the record to our 
in-memory state. This should always
                             // succeed; if it does not, that's a fatal error. 
It is important to do this before
                             // scheduling the record for Raft replication.
                             int recordIndex = 0;
+                            long nextWriteOffset = 
offsetControl.nextWriteOffset();
                             for (ApiMessageAndVersion message : records) {
-                                long recordOffset = prevEndOffset + 1 + 
recordIndex;
+                                long recordOffset = nextWriteOffset + 
recordIndex;
                                 try {
                                     replay(message.message(), 
Optional.empty(), recordOffset);
                                 } catch (Throwable e) {
@@ -753,22 +740,20 @@ public final class QuorumController implements Controller 
{
                                         "record at offset %d on active 
controller, from the " +
                                         "batch with baseOffset %d",
                                         
message.message().getClass().getSimpleName(),
-                                        recordOffset, prevEndOffset + 1);
+                                        recordOffset, nextWriteOffset);
                                     throw 
fatalFaultHandler.handleFault(failureMessage, e);
                                 }
                                 recordIndex++;
                             }
-                            long nextEndOffset = prevEndOffset + recordIndex;
+                            long nextEndOffset = nextWriteOffset - 1 + 
recordIndex;
                             raftClient.scheduleAtomicAppend(controllerEpoch,
-                                OptionalLong.of(prevEndOffset + 1),
+                                OptionalLong.of(nextWriteOffset),
                                 records);
-                            
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
-                            prevEndOffset = nextEndOffset;
+                            
offsetControl.handleScheduleAtomicAppend(nextEndOffset);
                             return nextEndOffset;
                         }
                     });
                 op.processBatchEndOffset(offset);
-                updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
 
                 log.debug("Read-write operation {} will be completed when the 
log " +
@@ -997,12 +982,7 @@ public final class QuorumController implements Controller {
                                 recordIndex++;
                             }
                         }
-
-                        updateLastCommittedState(
-                            offset,
-                            epoch,
-                            batch.appendTimestamp()
-                        );
+                        offsetControl.handleCommitBatch(batch);
                     }
                 } finally {
                     reader.close();
@@ -1017,43 +997,39 @@ public final class QuorumController implements 
Controller {
                     String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
                     if (isActiveController()) {
                         throw fatalFaultHandler.handleFault("Asked to load 
snapshot " + snapshotName +
-                            ", but we are the active controller at epoch " + 
curClaimEpoch);
+                                ", but we are the active controller at epoch " 
+ curClaimEpoch);
                     }
-                    log.info("Starting to replay snapshot {}, from last commit 
offset {} and epoch {}",
-                        snapshotName, lastCommittedOffset, lastCommittedEpoch);
-
-                    resetToEmptyState();
-
+                    offsetControl.beginLoadSnapshot(reader.snapshotId());
                     while (reader.hasNext()) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
 
                         log.debug("Replaying snapshot {} batch with last 
offset of {}",
-                            snapshotName, offset);
+                                snapshotName, offset);
 
                         int i = 1;
                         for (ApiMessageAndVersion message : messages) {
                             try {
-                                replay(message.message(), 
Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
+                                replay(message.message(), 
Optional.of(reader.snapshotId()),
+                                        reader.lastContainedLogOffset());
                             } catch (Throwable e) {
                                 String failureMessage = String.format("Unable 
to apply %s record " +
-                                        "from snapshot %s on standby 
controller, which was %d of " +
-                                        "%d record(s) in the batch with 
baseOffset %d.",
-                                        
message.message().getClass().getSimpleName(), reader.snapshotId(),
-                                        i, messages.size(), 
batch.baseOffset());
+                                    "from snapshot %s on standby controller, 
which was %d of " +
+                                    "%d record(s) in the batch with baseOffset 
%d.",
+                                    
message.message().getClass().getSimpleName(), reader.snapshotId(),
+                                    i, messages.size(), batch.baseOffset());
                                 throw 
fatalFaultHandler.handleFault(failureMessage, e);
                             }
                             i++;
                         }
                     }
-                    log.info("Finished replaying snapshot {}", snapshotName);
-
-                    updateLastCommittedState(
-                        reader.lastContainedLogOffset(),
-                        reader.lastContainedLogEpoch(),
-                        reader.lastContainedLogTimestamp());
-                    snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+                    
offsetControl.endLoadSnapshot(reader.lastContainedLogTimestamp());
+                } catch (FaultHandlerException e) {
+                    throw e;
+                } catch (Throwable e) {
+                    throw fatalFaultHandler.handleFault("Error while loading 
snapshot " +
+                            reader.snapshotId(), e);
                 } finally {
                     reader.close();
                 }
@@ -1076,15 +1052,16 @@ public final class QuorumController implements 
Controller {
                     } else {
                         log.warn("Renouncing the leadership due to a metadata 
log event. " +
                             "We were the leader at epoch {}, but in the new 
epoch {}, " +
-                            "the leader is {}. Reverting to last committed 
offset {}.",
-                            curClaimEpoch, newLeader.epoch(), newLeaderName, 
lastCommittedOffset);
+                            "the leader is {}. Reverting to last stable offset 
{}.",
+                            curClaimEpoch, newLeader.epoch(), newLeaderName,
+                            offsetControl.lastStableOffset());
                         renounce();
                     }
                 } else if (newLeader.isLeader(nodeId)) {
-                    long newLastWriteOffset = raftClient.logEndOffset() - 1;
-                    log.info("Becoming the active controller at epoch {}, last 
write offset {}.",
-                        newLeader.epoch(), newLastWriteOffset);
-                    claim(newLeader.epoch(), newLastWriteOffset);
+                    long newNextWriteOffset = raftClient.logEndOffset();
+                    log.info("Becoming the active controller at epoch {}, next 
write offset {}.",
+                        newLeader.epoch(), newNextWriteOffset);
+                    claim(newLeader.epoch(), newNextWriteOffset);
                 } else {
                     log.info("In the new epoch {}, the leader is {}.",
                         newLeader.epoch(), newLeaderName);
@@ -1116,38 +1093,16 @@ public final class QuorumController implements 
Controller {
         return claimEpoch != -1;
     }
 
-    private void updateWriteOffset(long offset) {
-        writeOffset = offset;
-        if (isActiveController()) {
-            controllerMetrics.setLastAppliedRecordOffset(writeOffset);
-            // This is not truly the append timestamp. The KRaft client 
doesn't expose the append time when scheduling a write.
-            // This is good enough because this is called right after the 
records were given to the KRAft client for appending and
-            // the default append linger for KRaft is 25ms.
-            
controllerMetrics.setLastAppliedRecordTimestamp(time.milliseconds());
-        } else {
-            // Change the last applied record metrics back to the last 
committed state. Inactive controllers report the last committed
-            // state while active controllers report the latest state which 
may include uncommitted data.
-            controllerMetrics.setLastAppliedRecordOffset(lastCommittedOffset);
-            
controllerMetrics.setLastAppliedRecordTimestamp(lastCommittedTimestamp);
-        }
-    }
-
-    private void claim(int epoch, long newLastWriteOffset) {
+    private void claim(int epoch, long newNextWriteOffset) {
         try {
             if (curClaimEpoch != -1) {
                 throw new RuntimeException("Cannot claim leadership because we 
are already the " +
                         "active controller.");
             }
             curClaimEpoch = epoch;
-            controllerMetrics.setActive(true);
-            updateWriteOffset(newLastWriteOffset);
+            offsetControl.activate(newNextWriteOffset);
             clusterControl.activate();
 
-            // Before switching to active, create an in-memory snapshot at the 
last committed
-            // offset. This is required because the active controller assumes 
that there is always
-            // an in-memory snapshot at the last committed offset.
-            snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-
             // Prepend the activate event. It is important that this event go 
at the beginning
             // of the queue rather than the end (hence prepend rather than 
append). It's also
             // important not to use prepend for anything else, to preserve the 
ordering here.
@@ -1276,22 +1231,6 @@ public final class QuorumController implements 
Controller {
         }
     }
 
-    private void updateLastCommittedState(
-        long offset,
-        int epoch,
-        long timestamp
-    ) {
-        lastCommittedOffset = offset;
-        lastCommittedEpoch = epoch;
-        lastCommittedTimestamp = timestamp;
-
-        controllerMetrics.setLastCommittedRecordOffset(offset);
-        if (!isActiveController()) {
-            controllerMetrics.setLastAppliedRecordOffset(offset);
-            controllerMetrics.setLastAppliedRecordTimestamp(timestamp);
-        }
-    }
-
     void renounce() {
         try {
             if (curClaimEpoch == -1) {
@@ -1300,16 +1239,9 @@ public final class QuorumController implements 
Controller {
             }
             raftClient.resign(curClaimEpoch);
             curClaimEpoch = -1;
-            controllerMetrics.setActive(false);
             deferredEventQueue.failAll(ControllerExceptions.
                     newWrongControllerException(OptionalInt.empty()));
-
-            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                throw new RuntimeException("Unable to find last committed 
offset " +
-                        lastCommittedEpoch + " in snapshot registry.");
-            }
-            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-            updateWriteOffset(-1);
+            offsetControl.deactivate();
             clusterControl.deactivate();
             cancelMaybeFenceReplicas();
             cancelMaybeBalancePartitionLeaders();
@@ -1555,20 +1487,20 @@ public final class QuorumController implements 
Controller {
             case ZK_MIGRATION_STATE_RECORD:
                 featureControl.replay((ZkMigrationStateRecord) message);
                 break;
+            case BEGIN_TRANSACTION_RECORD:
+                offsetControl.replay((BeginTransactionRecord) message, offset);
+                break;
+            case END_TRANSACTION_RECORD:
+                offsetControl.replay((EndTransactionRecord) message, offset);
+                break;
+            case ABORT_TRANSACTION_RECORD:
+                offsetControl.replay((AbortTransactionRecord) message, offset);
+                break;
             default:
                 throw new RuntimeException("Unhandled record type " + type);
         }
     }
 
-    /**
-     * Clear all data structures and reset all KRaft state.
-     */
-    private void resetToEmptyState() {
-        snapshotRegistry.reset();
-
-        updateLastCommittedState(-1, -1, -1);
-    }
-
     /**
      * Handles faults that cause a controller failover, but which don't abort 
the process.
      */
@@ -1621,6 +1553,11 @@ public final class QuorumController implements 
Controller {
      */
     private final DeferredEventQueue deferredEventQueue;
 
+    /**
+     * Manages read and write offsets, and in-memory snapshots.
+     */
+    private final OffsetControlManager offsetControl;
+
     /**
      * A predicate that returns information about whether a ConfigResource 
exists.
      */
@@ -1699,26 +1636,6 @@ public final class QuorumController implements 
Controller {
      */
     private volatile int curClaimEpoch;
 
-    /**
-     * The last offset we have committed, or -1 if we have not committed any 
offsets.
-     */
-    private long lastCommittedOffset = -1;
-
-    /**
-     * The epoch of the last offset we have committed, or -1 if we have not 
committed any offsets.
-     */
-    private int lastCommittedEpoch = -1;
-
-    /**
-     * The timestamp in milliseconds of the last batch we have committed, or 
-1 if we have not committed any offset.
-     */
-    private long lastCommittedTimestamp = -1;
-
-    /**
-     * If we have called scheduleWrite, this is the last offset we got back 
from it.
-     */
-    private long writeOffset;
-
     /**
      * How long to delay partition leader balancing operations.
      */
@@ -1803,6 +1720,12 @@ public final class QuorumController implements 
Controller {
         this.controllerMetrics = controllerMetrics;
         this.snapshotRegistry = new SnapshotRegistry(logContext);
         this.deferredEventQueue = new DeferredEventQueue(logContext);
+        this.offsetControl = new OffsetControlManager.Builder().
+            setLogContext(logContext).
+            setSnapshotRegistry(snapshotRegistry).
+            setMetrics(controllerMetrics).
+            setTime(time).
+            build();
         this.resourceExists = new ConfigResourceExistenceChecker();
         this.configurationControl = new ConfigurationControlManager.Builder().
             setLogContext(logContext).
@@ -1876,9 +1799,6 @@ public final class QuorumController implements Controller 
{
         this.zkRecordConsumer = new MigrationRecordConsumer();
         this.zkMigrationEnabled = zkMigrationEnabled;
         this.recordRedactor = new RecordRedactor(configSchema);
-        updateWriteOffset(-1);
-
-        resetToEmptyState();
 
         log.info("Creating new QuorumController with clusterId {}.{}",
                 clusterId, zkMigrationEnabled ? " ZK migration mode is 
enabled." : "");
@@ -1939,7 +1859,7 @@ public final class QuorumController implements Controller 
{
         if (names.isEmpty())
             return CompletableFuture.completedFuture(Collections.emptyMap());
         return appendReadEvent("findTopicIds", context.deadlineNs(),
-            () -> replicationControl.findTopicIds(currentReadOffset(), names));
+            () -> 
replicationControl.findTopicIds(offsetControl.lastStableOffset(), names));
     }
 
     @Override
@@ -1947,7 +1867,7 @@ public final class QuorumController implements Controller 
{
         ControllerRequestContext context
     ) {
         return appendReadEvent("findAllTopicIds", context.deadlineNs(),
-            () -> replicationControl.findAllTopicIds(currentReadOffset()));
+            () -> 
replicationControl.findAllTopicIds(offsetControl.lastStableOffset()));
     }
 
     @Override
@@ -1958,7 +1878,7 @@ public final class QuorumController implements Controller 
{
         if (ids.isEmpty())
             return CompletableFuture.completedFuture(Collections.emptyMap());
         return appendReadEvent("findTopicNames", context.deadlineNs(),
-            () -> replicationControl.findTopicNames(currentReadOffset(), ids));
+            () -> 
replicationControl.findTopicNames(offsetControl.lastStableOffset(), ids));
     }
 
     @Override
@@ -1978,7 +1898,7 @@ public final class QuorumController implements Controller 
{
         Map<ConfigResource, Collection<String>> resources
     ) {
         return appendReadEvent("describeConfigs", context.deadlineNs(),
-            () -> configurationControl.describeConfigs(currentReadOffset(), 
resources));
+            () -> 
configurationControl.describeConfigs(offsetControl.lastStableOffset(), 
resources));
     }
 
     @Override
@@ -2000,7 +1920,7 @@ public final class QuorumController implements Controller 
{
         ControllerRequestContext context
     ) {
         return appendReadEvent("getFinalizedFeatures", context.deadlineNs(),
-            () -> featureControl.finalizedFeatures(currentReadOffset()));
+            () -> 
featureControl.finalizedFeatures(offsetControl.lastStableOffset()));
     }
 
     @Override
@@ -2045,7 +1965,8 @@ public final class QuorumController implements Controller 
{
                 new 
ListPartitionReassignmentsResponseData().setErrorMessage(null));
         }
         return appendReadEvent("listPartitionReassignments", 
context.deadlineNs(),
-            () -> 
replicationControl.listPartitionReassignments(request.topics(), 
currentReadOffset()));
+            () -> 
replicationControl.listPartitionReassignments(request.topics(),
+                offsetControl.lastStableOffset()));
     }
 
     @Override
@@ -2120,7 +2041,7 @@ public final class QuorumController implements Controller 
{
         return appendWriteEvent("registerBroker", context.deadlineNs(),
             () -> {
                 ControllerResult<BrokerRegistrationReply> result = 
clusterControl.
-                    registerBroker(request, writeOffset + 1, featureControl.
+                    registerBroker(request, offsetControl.nextWriteOffset(), 
featureControl.
                         finalizedFeatures(Long.MAX_VALUE));
                 rescheduleMaybeFenceStaleBrokers();
                 return result;
@@ -2272,9 +2193,9 @@ public final class QuorumController implements Controller 
{
     }
 
     // VisibleForTesting
-    void setWriteOffset(long newWriteOffset) {
-        appendControlEvent("setWriteOffset", () -> {
-            this.writeOffset = newWriteOffset;
+    void setNewNextWriteOffset(long newNextWriteOffset) {
+        appendControlEvent("setNewNextWriteOffset", () -> {
+            offsetControl.setNextWriteOffset(newNextWriteOffset);
         });
     }
 }
diff --git 
a/metadata/src/main/resources/common/metadata/AbortTransactionRecord.json 
b/metadata/src/main/resources/common/metadata/AbortTransactionRecord.json
new file mode 100644
index 00000000000..9ce9c2f5272
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/AbortTransactionRecord.json
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 25,
+  "type": "metadata",
+  "name": "AbortTransactionRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "Reason", "type": "string", "default": "null", 
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 0,
+      "about": "An optional textual description of why the transaction was 
aborted." }
+  ]
+}
diff --git 
a/metadata/src/main/resources/common/metadata/BeginTransactionRecord.json 
b/metadata/src/main/resources/common/metadata/BeginTransactionRecord.json
new file mode 100644
index 00000000000..de583916b1f
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/BeginTransactionRecord.json
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 23,
+  "type": "metadata",
+  "name": "BeginTransactionRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "Name", "type": "string", "default": "null", 
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 0,
+      "about": "An optional textual description of this transaction." }
+  ]
+}
diff --git 
a/metadata/src/main/resources/common/metadata/EndTransactionRecord.json 
b/metadata/src/main/resources/common/metadata/EndTransactionRecord.json
new file mode 100644
index 00000000000..6c1489630d1
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/EndTransactionRecord.json
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 24,
+  "type": "metadata",
+  "name": "EndTransactionRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+  ]
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java
new file mode 100644
index 00000000000..bb618f892c8
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.common.metadata.NoOpRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.TrackingSnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class OffsetControlManagerTest {
+    @Test
+    public void testInitialValues() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        assertNull(offsetControl.currentSnapshotId());
+        assertNull(offsetControl.currentSnapshotName());
+        assertEquals(-1L, offsetControl.lastCommittedOffset());
+        assertEquals(-1, offsetControl.lastCommittedEpoch());
+        assertEquals(-1, offsetControl.lastStableOffset());
+        assertEquals(-1, offsetControl.transactionStartOffset());
+        assertEquals(-1, offsetControl.nextWriteOffset());
+        assertFalse(offsetControl.active());
+        assertEquals(Arrays.asList(-1L), 
offsetControl.snapshotRegistry().epochsList());
+    }
+
+    @Test
+    public void testActivate() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        offsetControl.activate(1000L);
+        assertEquals(1000L, offsetControl.nextWriteOffset());
+        assertTrue(offsetControl.active());
+        assertTrue(offsetControl.metrics().active());
+        assertEquals(Arrays.asList(-1L), 
offsetControl.snapshotRegistry().epochsList());
+    }
+
+    @Test
+    public void testActivateFailsIfAlreadyActive() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        offsetControl.activate(1000L);
+        assertEquals("Can't activate already active OffsetControlManager.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.activate(2000L)).
+                    getMessage());
+    }
+
+    @Test
+    public void testActivateFailsIfNewNextWriteOffsetIsNegative() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        assertEquals("Invalid negative newNextWriteOffset -2.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.activate(-2)).
+                    getMessage());
+    }
+
+    @Test
+    public void testActivateAndDeactivate() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        offsetControl.activate(1000L);
+        assertEquals(1000L, offsetControl.nextWriteOffset());
+        offsetControl.deactivate();
+        assertEquals(-1L, offsetControl.nextWriteOffset());
+    }
+
+    @Test
+    public void testDeactivateFailsIfNotActive() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        assertEquals("Can't deactivate inactive OffsetControlManager.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.deactivate()).
+                    getMessage());
+    }
+
+    private static Batch<ApiMessageAndVersion> newFakeBatch(
+        long lastOffset,
+        int epoch,
+        long appendTimestamp
+    ) {
+        return Batch.data(
+            lastOffset,
+            epoch,
+            appendTimestamp,
+            100,
+            Collections.singletonList(new ApiMessageAndVersion(new 
NoOpRecord(), (short) 0)));
+    }
+
+    @Test
+    public void testHandleCommitBatch() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+
+        offsetControl.handleCommitBatch(newFakeBatch(1000L, 200, 3000L));
+        assertEquals(Arrays.asList(1000L), 
offsetControl.snapshotRegistry().epochsList());
+        assertEquals(1000L, offsetControl.lastCommittedOffset());
+        assertEquals(200, offsetControl.lastCommittedEpoch());
+        assertEquals(1000L, offsetControl.lastStableOffset());
+        assertEquals(-1L, offsetControl.transactionStartOffset());
+        assertEquals(-1L, offsetControl.nextWriteOffset());
+        assertFalse(offsetControl.active());
+        assertFalse(offsetControl.metrics().active());
+        assertEquals(1000L, offsetControl.metrics().lastAppliedRecordOffset());
+        assertEquals(1000L, 
offsetControl.metrics().lastCommittedRecordOffset());
+        assertEquals(3000L, 
offsetControl.metrics().lastAppliedRecordTimestamp());
+    }
+
+    @Test
+    public void testHandleScheduleAtomicAppend() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+
+        offsetControl.handleScheduleAtomicAppend(2000L);
+        assertEquals(2001L, offsetControl.nextWriteOffset());
+        assertEquals(2000L, offsetControl.metrics().lastAppliedRecordOffset());
+        assertEquals(-1L, offsetControl.lastStableOffset());
+        assertEquals(-1L, offsetControl.lastCommittedOffset());
+        assertEquals(Arrays.asList(-1L, 2000L), 
offsetControl.snapshotRegistry().epochsList());
+
+        offsetControl.handleCommitBatch(newFakeBatch(2000L, 200, 3000L));
+        assertEquals(2000L, offsetControl.lastStableOffset());
+        assertEquals(2000L, offsetControl.lastCommittedOffset());
+        assertEquals(Arrays.asList(2000L), 
offsetControl.snapshotRegistry().epochsList());
+    }
+
+    @Test
+    public void testHandleLoadSnapshot() {
+        TrackingSnapshotRegistry snapshotRegistry = new 
TrackingSnapshotRegistry(new LogContext());
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().
+                setSnapshotRegistry(snapshotRegistry).
+                build();
+
+        offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
+        assertEquals(Arrays.asList("snapshot[-1]", "reset"), 
snapshotRegistry.operations());
+        assertEquals(new OffsetAndEpoch(4000L, 300), 
offsetControl.currentSnapshotId());
+        assertEquals("00000000000000004000-0000000300", 
offsetControl.currentSnapshotName());
+        assertEquals(Arrays.asList(), 
offsetControl.snapshotRegistry().epochsList());
+
+        offsetControl.endLoadSnapshot(3456L);
+        assertEquals(Arrays.asList("snapshot[-1]", "reset", "snapshot[4000]"),
+            snapshotRegistry.operations());
+        assertNull(offsetControl.currentSnapshotId());
+        assertNull(offsetControl.currentSnapshotName());
+        assertEquals(Arrays.asList(4000L), 
offsetControl.snapshotRegistry().epochsList());
+        assertEquals(4000L, offsetControl.lastCommittedOffset());
+        assertEquals(300, offsetControl.lastCommittedEpoch());
+        assertEquals(4000L, offsetControl.lastStableOffset());
+        assertEquals(-1L, offsetControl.transactionStartOffset());
+        assertEquals(-1L, offsetControl.nextWriteOffset());
+        assertEquals(4000L, 
offsetControl.metrics().lastCommittedRecordOffset());
+        assertEquals(4000L, offsetControl.metrics().lastAppliedRecordOffset());
+        assertEquals(3456L, 
offsetControl.metrics().lastAppliedRecordTimestamp());
+    }
+
+    @Test
+    public void testBeginTransactionRecordNotAllowedInSnapshot() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
+        assertEquals("BeginTransactionRecord cannot appear within a snapshot.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.replay(new BeginTransactionRecord(), 
1000L)).
+                    getMessage());
+    }
+
+    @Test
+    public void testEndTransactionRecordNotAllowedInSnapshot() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
+        assertEquals("EndTransactionRecord cannot appear within a snapshot.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.replay(new EndTransactionRecord(), 1000L)).
+                    getMessage());
+    }
+
+    @Test
+    public void testAbortTransactionRecordNotAllowedInSnapshot() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
+        assertEquals("AbortTransactionRecord cannot appear within a snapshot.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.replay(new AbortTransactionRecord(), 
1000L)).
+                    getMessage());
+    }
+
+    @Test
+    public void testEndLoadSnapshotFailsWhenNotInSnapshot() {
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().build();
+        assertEquals("Can't end loading snapshot, because there is no current 
snapshot.",
+            assertThrows(RuntimeException.class,
+                () -> offsetControl.endLoadSnapshot(1000L)).
+                    getMessage());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testReplayTransaction(boolean aborted) {
+        TrackingSnapshotRegistry snapshotRegistry = new 
TrackingSnapshotRegistry(new LogContext());
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            build();
+
+        offsetControl.replay(new BeginTransactionRecord(), 1500L);
+        assertEquals(1500L, offsetControl.transactionStartOffset());
+        assertEquals(Arrays.asList(-1L, 1499L), 
offsetControl.snapshotRegistry().epochsList());
+
+        offsetControl.handleCommitBatch(newFakeBatch(1550L, 100, 2000L));
+        assertEquals(1550L, offsetControl.lastCommittedOffset());
+        assertEquals(100, offsetControl.lastCommittedEpoch());
+        assertEquals(1499L, offsetControl.lastStableOffset());
+        assertEquals(Arrays.asList(1499L), 
offsetControl.snapshotRegistry().epochsList());
+
+        if (aborted) {
+            offsetControl.replay(new AbortTransactionRecord(), 1600L);
+            assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]", 
"revert[1499]"),
+                snapshotRegistry.operations());
+        } else {
+            offsetControl.replay(new EndTransactionRecord(), 1600L);
+            assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]"),
+                snapshotRegistry.operations());
+        }
+        assertEquals(-1L, offsetControl.transactionStartOffset());
+        assertEquals(1499L, offsetControl.lastStableOffset());
+
+        offsetControl.handleCommitBatch(newFakeBatch(1650, 100, 2100L));
+        assertEquals(1650, offsetControl.lastStableOffset());
+        assertEquals(Arrays.asList(1650L), 
offsetControl.snapshotRegistry().epochsList());
+    }
+
+    @Test
+    public void testLoadSnapshotClearsTransactionalState() {
+        TrackingSnapshotRegistry snapshotRegistry = new 
TrackingSnapshotRegistry(new LogContext());
+        OffsetControlManager offsetControl = new 
OffsetControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            build();
+        offsetControl.replay(new BeginTransactionRecord(), 1500L);
+        offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
+        assertEquals(-1L, offsetControl.transactionStartOffset());
+        assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]", "reset"),
+                snapshotRegistry.operations());
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index ff61e1dfdb2..611c1490942 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -92,7 +92,7 @@ public class QuorumControllerIntegrationTestUtils {
                     .setBrokerId(brokerId)
                     .setRack(null)
                     .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0))
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latest()))
                     .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
                     .setListeners(new ListenerCollection(
                         Arrays.asList(
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
index 9dc538f4e2f..52049b5626d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -110,7 +110,7 @@ public class QuorumControllerMetricsIntegrationTest {
                 }
             });
             if (forceFailoverUsingLogLayer) {
-                controlEnv.activeController().setWriteOffset(123L);
+                controlEnv.activeController().setNewNextWriteOffset(123L);
 
                 TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
                     createTopics(controlEnv.activeController(), "test_", 1, 1);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index bdf5e8f86e5..67e08356ddd 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -158,7 +158,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0)).
+                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV1)).
                 setBrokerId(0).
                 setClusterId(logEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
@@ -199,7 +199,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV1)).
                     setBrokerId(0).
                     setClusterId(logEnv.clusterId())).get();
             testDelayedConfigurationOperations(logEnv, 
controlEnv.activeController());
@@ -536,7 +536,7 @@ public class QuorumControllerTest {
                     setBrokerId(0).
                     setClusterId(active.clusterId()).
                     
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV1)).
                     setListeners(listeners));
             assertEquals(3L, reply.get().epoch());
             CreateTopicsRequestData createTopicsRequestData =
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 17220e7dbf5..1875a798fa0 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -177,7 +177,10 @@ public enum MetadataVersion {
     IBP_3_5_IV2(11, "3.5", "IV2", true),
 
     // Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-    IBP_3_6_IV0(12, "3.6", "IV0", false);
+    IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+    // Add metadata transactions
+    IBP_3_6_IV1(13, "3.6", "IV1", true);
 
     // NOTE: update the default version in @ClusterTest annotation to point to 
the latest version
     public static final String FEATURE_NAME = "metadata.version";
@@ -267,6 +270,10 @@ public enum MetadataVersion {
         return !this.isAtLeast(IBP_3_6_IV0);
     }
 
+    public boolean isMetadataTransactionSupported() {
+        return this.isAtLeast(IBP_3_6_IV1);
+    }
+
     public boolean isKRaftSupported() {
         return this.featureLevel > 0;
     }
diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 25177d45d71..0d10eeaa2c4 100644
--- 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -204,6 +204,7 @@ public class SnapshotRegistry {
      * @param targetEpoch       The epoch of the snapshot to revert to.
      */
     public void revertToSnapshot(long targetEpoch) {
+        log.debug("Reverting to in-memory snapshot {}", targetEpoch);
         Snapshot target = getSnapshot(targetEpoch);
         Iterator<Snapshot> iterator = iterator(target);
         iterator.next();
diff --git 
a/server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java
 
b/server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java
new file mode 100644
index 00000000000..182051a3e88
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class TrackingSnapshotRegistry extends SnapshotRegistry {
+    private final List<String> operations = new ArrayList<>();
+
+    public TrackingSnapshotRegistry(LogContext logContext) {
+        super(logContext);
+    }
+
+    public List<String> operations() {
+        return new ArrayList<>(operations);
+    }
+
+    @Override
+    public void revertToSnapshot(long targetEpoch) {
+        operations.add("revert[" + targetEpoch + "]");
+        super.revertToSnapshot(targetEpoch);
+    }
+
+    @Override
+    public void reset() {
+        operations.add("reset");
+        super.reset();
+    }
+
+    @Override
+    public Snapshot getOrCreateSnapshot(long epoch) {
+        if (!hasSnapshot(epoch)) {
+            operations.add("snapshot[" + epoch + "]");
+        }
+        return super.getOrCreateSnapshot(epoch);
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index d8c0b0df0e3..8d7f56fcae7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -67,7 +67,7 @@ public class FeatureCommandTest {
                 assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
         );
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
-                "SupportedMaxVersion: 3.6-IV0\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(commandOutput));
+                "SupportedMaxVersion: 3.6-IV1\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(commandOutput));
     }
 
     @ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
@@ -125,7 +125,7 @@ public class FeatureCommandTest {
                         "disable", "--feature", "metadata.version"))
         );
         assertEquals("Could not disable metadata.version. Invalid update 
version 0 for feature " +
-                "metadata.version. Local controller 3000 only supports 
versions 1-12", commandOutput);
+                "metadata.version. Local controller 3000 only supports 
versions 1-13", commandOutput);
 
         commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),

Reply via email to