This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2dec39d6e49 KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, 
BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module 
(#13043)
2dec39d6e49 is described below

commit 2dec39d6e49da4cfb502da3e84d4f9c50508e809
Author: Satish Duggana <sati...@apache.org>
AuthorDate: Sun Jan 8 09:43:38 2023 +0530

    KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, 
ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043)
    
    For broader context on this change, see:
    * KAFKA-14470: Move log layer to storage module.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |   9 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |   3 +-
 .../scala/kafka/log/ProducerStateManager.scala     | 346 +--------------------
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  19 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   2 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |   8 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  77 +++--
 .../kafka/server/log/internals/BatchMetadata.java  |  79 +++++
 .../kafka/server/log/internals/LastRecord.java     |  59 ++++
 .../server/log/internals/ProducerAppendInfo.java   | 239 ++++++++++++++
 .../server/log/internals/ProducerStateEntry.java   | 150 +++++++++
 .../kafka/server/log/internals/TxnMetadata.java    |  51 +++
 12 files changed, 665 insertions(+), 377 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 83b1b0e81b6..5a098790a31 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,7 +32,7 @@ import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
-import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, 
LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
+import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, 
LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
@@ -680,9 +680,10 @@ private[log] class Cleaner(val id: Int,
           // 3) The last entry in the log is a transaction marker. We retain 
this marker since it has the
           //    last producer epoch, which is needed to ensure fencing.
           lastRecordsOfActiveProducers.get(batch.producerId).exists { 
lastRecord =>
-            lastRecord.lastDataOffset match {
-              case Some(offset) => batch.lastOffset == offset
-              case None => batch.isControlBatch && batch.producerEpoch == 
lastRecord.producerEpoch
+            if (lastRecord.lastDataOffset.isPresent) {
+              batch.lastOffset == lastRecord.lastDataOffset.getAsLong
+            } else {
+              batch.isControlBatch && batch.producerEpoch == 
lastRecord.producerEpoch
             }
           }
         }
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index d289df2ec47..53b51cb16ac 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, 
Utils}
 import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, 
CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, 
OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, 
TxnIndexSearchResult}
 
 import java.util.Optional
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 import scala.math._
 
@@ -249,7 +250,7 @@ class LogSegment private[log] (val log: FileRecords,
     if (batch.hasProducerId) {
       val producerId = batch.producerId
       val appendInfo = producerStateManager.prepareUpdate(producerId, origin = 
AppendOrigin.REPLICATION)
-      val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt 
= None)
+      val maybeCompletedTxn = appendInfo.append(batch, 
Optional.empty()).asScala
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>
         val lastStableOffset = 
producerStateManager.lastStableOffset(completedTxn)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index da9f17c2c22..2dc7748152d 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -20,9 +20,8 @@ import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import kafka.utils.{Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.types._
-import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, 
EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time}
 import org.apache.kafka.server.log.internals._
 
@@ -30,320 +29,11 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
+import java.util.{Optional, OptionalLong}
 import java.util.concurrent.ConcurrentSkipListMap
-import scala.collection.mutable.ListBuffer
 import scala.collection.{immutable, mutable}
 import scala.jdk.CollectionConverters._
 
-/**
- * The last written record for a given producer. The last data offset may be 
undefined
- * if the only log entry for a producer is a transaction marker.
- */
-case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
-
-
-private[log] case class TxnMetadata(
-  producerId: Long,
-  firstOffset: LogOffsetMetadata,
-  var lastOffset: Option[Long] = None
-) {
-  def this(producerId: Long, firstOffset: Long) = this(producerId, new 
LogOffsetMetadata(firstOffset))
-
-  override def toString: String = {
-    "TxnMetadata(" +
-      s"producerId=$producerId, " +
-      s"firstOffset=$firstOffset, " +
-      s"lastOffset=$lastOffset)"
-  }
-}
-
-private[log] object ProducerStateEntry {
-  private[log] val NumBatchesToRetain = 5
-
-  def empty(producerId: Long) = new ProducerStateEntry(producerId,
-    batchMetadata = mutable.Queue[BatchMetadata](),
-    producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-    coordinatorEpoch = -1,
-    lastTimestamp = RecordBatch.NO_TIMESTAMP,
-    currentTxnFirstOffset = None)
-}
-
-private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, 
offsetDelta: Int, timestamp: Long) {
-  def firstSeq: Int =  DefaultRecordBatch.decrementSequence(lastSeq, 
offsetDelta)
-  def firstOffset: Long = lastOffset - offsetDelta
-
-  override def toString: String = {
-    "BatchMetadata(" +
-      s"firstSeq=$firstSeq, " +
-      s"lastSeq=$lastSeq, " +
-      s"firstOffset=$firstOffset, " +
-      s"lastOffset=$lastOffset, " +
-      s"timestamp=$timestamp)"
-  }
-}
-
-// the batchMetadata is ordered such that the batch with the lowest sequence 
is at the head of the queue while the
-// batch with the highest sequence is at the tail of the queue. We will retain 
at most ProducerStateEntry.NumBatchesToRetain
-// elements in the queue. When the queue is at capacity, we remove the first 
element to make space for the incoming batch.
-private[log] class ProducerStateEntry(val producerId: Long,
-                                      val batchMetadata: 
mutable.Queue[BatchMetadata],
-                                      var producerEpoch: Short,
-                                      var coordinatorEpoch: Int,
-                                      var lastTimestamp: Long,
-                                      var currentTxnFirstOffset: Option[Long]) 
{
-
-  def firstSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else 
batchMetadata.front.firstSeq
-
-  def firstDataOffset: Long = if (isEmpty) -1L else 
batchMetadata.front.firstOffset
-
-  def lastSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else 
batchMetadata.last.lastSeq
-
-  def lastDataOffset: Long = if (isEmpty) -1L else 
batchMetadata.last.lastOffset
-
-  def lastOffsetDelta : Int = if (isEmpty) 0 else 
batchMetadata.last.offsetDelta
-
-  def isEmpty: Boolean = batchMetadata.isEmpty
-
-  def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, 
offsetDelta: Int, timestamp: Long): Unit = {
-    maybeUpdateProducerEpoch(producerEpoch)
-    addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, 
timestamp))
-    this.lastTimestamp = timestamp
-  }
-
-  def maybeUpdateProducerEpoch(producerEpoch: Short): Boolean = {
-    if (this.producerEpoch != producerEpoch) {
-      batchMetadata.clear()
-      this.producerEpoch = producerEpoch
-      true
-    } else {
-      false
-    }
-  }
-
-  private def addBatchMetadata(batch: BatchMetadata): Unit = {
-    if (batchMetadata.size == ProducerStateEntry.NumBatchesToRetain)
-      batchMetadata.dequeue()
-    batchMetadata.enqueue(batch)
-  }
-
-  def update(nextEntry: ProducerStateEntry): Unit = {
-    maybeUpdateProducerEpoch(nextEntry.producerEpoch)
-    while (nextEntry.batchMetadata.nonEmpty)
-      addBatchMetadata(nextEntry.batchMetadata.dequeue())
-    this.coordinatorEpoch = nextEntry.coordinatorEpoch
-    this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset
-    this.lastTimestamp = nextEntry.lastTimestamp
-  }
-
-  def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
-    if (batch.producerEpoch != producerEpoch)
-       None
-    else
-      batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
-  }
-
-  // Return the batch metadata of the cached batch having the exact sequence 
range, if any.
-  def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): 
Option[BatchMetadata] = {
-    val duplicate = batchMetadata.filter { metadata =>
-      firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
-    }
-    duplicate.headOption
-  }
-
-  override def toString: String = {
-    "ProducerStateEntry(" +
-      s"producerId=$producerId, " +
-      s"producerEpoch=$producerEpoch, " +
-      s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
-      s"coordinatorEpoch=$coordinatorEpoch, " +
-      s"lastTimestamp=$lastTimestamp, " +
-      s"batchMetadata=$batchMetadata"
-  }
-}
-
-/**
- * This class is used to validate the records appended by a given producer 
before they are written to the log.
- * It is initialized with the producer's state after the last successful 
append, and transitively validates the
- * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
- * as the incoming records are validated.
- *
- * @param producerId The id of the producer appending to the log
- * @param currentEntry  The current entry associated with the producer id 
which contains metadata for a fixed number of
- *                      the most recent appends made by the producer. 
Validation of the first incoming append will
- *                      be made against the latest append in the current 
entry. New appends will replace older appends
- *                      in the current entry so that the space overhead is 
constant.
- * @param origin Indicates the origin of the append which implies the extent 
of validation. For example, offset
- *               commits, which originate from the group coordinator, do not 
have sequence numbers and therefore
- *               only producer epoch validation is done. Appends which come 
through replication are not validated
- *               (we assume the validation has already been done) and appends 
from clients require full validation.
- */
-private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
-                                      val producerId: Long,
-                                      val currentEntry: ProducerStateEntry,
-                                      val origin: AppendOrigin) extends 
Logging {
-
-  private val transactions = ListBuffer.empty[TxnMetadata]
-  private val updatedEntry = ProducerStateEntry.empty(producerId)
-
-  updatedEntry.producerEpoch = currentEntry.producerEpoch
-  updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
-  updatedEntry.lastTimestamp = currentEntry.lastTimestamp
-  updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
-
-  private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, 
offset: Long): Unit = {
-    checkProducerEpoch(producerEpoch, offset)
-    if (origin == AppendOrigin.CLIENT) {
-      checkSequence(producerEpoch, firstSeq, offset)
-    }
-  }
-
-  private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
-    if (producerEpoch < updatedEntry.producerEpoch) {
-      val message = s"Epoch of producer $producerId at offset $offset in 
$topicPartition is $producerEpoch, " +
-        s"which is smaller than the last seen epoch 
${updatedEntry.producerEpoch}"
-
-      if (origin == AppendOrigin.REPLICATION) {
-        warn(message)
-      } else {
-        // Starting from 2.7, we replaced ProducerFenced error with 
InvalidProducerEpoch in the
-        // producer send response callback to differentiate from the former 
fatal exception,
-        // letting client abort the ongoing transaction and retry.
-        throw new InvalidProducerEpochException(message)
-      }
-    }
-  }
-
-  private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: 
Long): Unit = {
-    if (producerEpoch != updatedEntry.producerEpoch) {
-      if (appendFirstSeq != 0) {
-        if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch of producer $producerId " +
-            s"at offset $offset in partition $topicPartition: $producerEpoch 
(request epoch), $appendFirstSeq (seq. number), " +
-            s"${updatedEntry.producerEpoch} (current producer epoch)")
-        }
-      }
-    } else {
-      val currentLastSeq = if (!updatedEntry.isEmpty)
-        updatedEntry.lastSeq
-      else if (producerEpoch == currentEntry.producerEpoch)
-        currentEntry.lastSeq
-      else
-        RecordBatch.NO_SEQUENCE
-
-      // If there is no current producer epoch (possibly because all producer 
records have been deleted due to
-      // retention or the DeleteRecords API) accept writes with any sequence 
number
-      if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
inSequence(currentLastSeq, appendFirstSeq))) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producer $producerId at " +
-          s"offset $offset in partition $topicPartition: $appendFirstSeq 
(incoming seq. number), " +
-          s"$currentLastSeq (current end sequence number)")
-      }
-    }
-  }
-
-  private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {
-    nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
-  }
-
-  def append(batch: RecordBatch, firstOffsetMetadataOpt: 
Option[LogOffsetMetadata]): Option[CompletedTxn] = {
-    if (batch.isControlBatch) {
-      val recordIterator = batch.iterator
-      if (recordIterator.hasNext) {
-        val record = recordIterator.next()
-        val endTxnMarker = EndTransactionMarker.deserialize(record)
-        appendEndTxnMarker(endTxnMarker, batch.producerEpoch, 
batch.baseOffset, record.timestamp)
-      } else {
-        // An empty control batch means the entire transaction has been 
cleaned from the log, so no need to append
-        None
-      }
-    } else {
-      val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(new 
LogOffsetMetadata(batch.baseOffset))
-      appendDataBatch(batch.producerEpoch, batch.baseSequence, 
batch.lastSequence, batch.maxTimestamp,
-        firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
-      None
-    }
-  }
-
-  def appendDataBatch(epoch: Short,
-                      firstSeq: Int,
-                      lastSeq: Int,
-                      lastTimestamp: Long,
-                      firstOffsetMetadata: LogOffsetMetadata,
-                      lastOffset: Long,
-                      isTransactional: Boolean): Unit = {
-    val firstOffset = firstOffsetMetadata.messageOffset
-    maybeValidateDataBatch(epoch, firstSeq, firstOffset)
-    updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - 
firstOffset).toInt, lastTimestamp)
-
-    updatedEntry.currentTxnFirstOffset match {
-      case Some(_) if !isTransactional =>
-        // Received a non-transactional message while a transaction is active
-        throw new InvalidTxnStateException(s"Expected transactional write from 
producer $producerId at " +
-          s"offset $firstOffsetMetadata in partition $topicPartition")
-
-      case None if isTransactional =>
-        // Began a new transaction
-        updatedEntry.currentTxnFirstOffset = Some(firstOffset)
-        transactions += TxnMetadata(producerId, firstOffsetMetadata)
-
-      case _ => // nothing to do
-    }
-  }
-
-  private def checkCoordinatorEpoch(endTxnMarker: EndTransactionMarker, 
offset: Long): Unit = {
-    if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) {
-      if (origin == AppendOrigin.REPLICATION) {
-        info(s"Detected invalid coordinator epoch for producerId $producerId 
at " +
-          s"offset $offset in partition $topicPartition: 
${endTxnMarker.coordinatorEpoch} " +
-          s"is older than previously known coordinator epoch 
${updatedEntry.coordinatorEpoch}")
-      } else {
-        throw new TransactionCoordinatorFencedException(s"Invalid coordinator 
epoch for producerId $producerId at " +
-          s"offset $offset in partition $topicPartition: 
${endTxnMarker.coordinatorEpoch} " +
-          s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
-      }
-    }
-  }
-
-  def appendEndTxnMarker(
-    endTxnMarker: EndTransactionMarker,
-    producerEpoch: Short,
-    offset: Long,
-    timestamp: Long
-  ): Option[CompletedTxn] = {
-    checkProducerEpoch(producerEpoch, offset)
-    checkCoordinatorEpoch(endTxnMarker, offset)
-
-    // Only emit the `CompletedTxn` for non-empty transactions. A transaction 
marker
-    // without any associated data will not have any impact on the last stable 
offset
-    // and would not need to be reflected in the transaction index.
-    val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
-      new CompletedTxn(producerId, firstOffset, offset, 
endTxnMarker.controlType == ControlRecordType.ABORT)
-    }
-
-    updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
-    updatedEntry.currentTxnFirstOffset = None
-    updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
-    updatedEntry.lastTimestamp = timestamp
-
-    completedTxn
-  }
-
-  def toEntry: ProducerStateEntry = updatedEntry
-
-  def startedTransactions: List[TxnMetadata] = transactions.toList
-
-  override def toString: String = {
-    "ProducerAppendInfo(" +
-      s"producerId=$producerId, " +
-      s"producerEpoch=${updatedEntry.producerEpoch}, " +
-      s"firstSequence=${updatedEntry.firstSeq}, " +
-      s"lastSequence=${updatedEntry.lastSeq}, " +
-      s"currentTxnFirstOffset=${updatedEntry.currentTxnFirstOffset}, " +
-      s"coordinatorEpoch=${updatedEntry.coordinatorEpoch}, " +
-      s"lastTimestamp=${updatedEntry.lastTimestamp}, " +
-      s"startedTransactions=$transactions)"
-  }
-}
-
 object ProducerStateManager {
   val LateTransactionBufferMs = 5 * 60 * 1000
 
@@ -403,13 +93,11 @@ object ProducerStateManager {
         val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
         val coordinatorEpoch = 
producerEntryStruct.getInt(CoordinatorEpochField)
         val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-        val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata]
-        if (offset >= 0)
-          lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, 
timestamp)
-
-        val newEntry = new ProducerStateEntry(producerId, 
lastAppendedDataBatches, producerEpoch,
-          coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) 
Some(currentTxnFirstOffset) else None)
-        newEntry
+        val batchMetadata =
+          if (offset >= 0) Optional.of(new BatchMetadata(seq, offset, 
offsetDelta, timestamp))
+          else Optional.empty[BatchMetadata]()
+        val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) 
OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty()
+        new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, 
timestamp, currentTxnFirstOffsetValue, batchMetadata)
       }
     } catch {
       case e: SchemaException =>
@@ -431,7 +119,7 @@ object ProducerStateManager {
           .set(OffsetDeltaField, entry.lastOffsetDelta)
           .set(TimestampField, entry.lastTimestamp)
           .set(CoordinatorEpochField, entry.coordinatorEpoch)
-          .set(CurrentTxnFirstOffsetField, 
entry.currentTxnFirstOffset.getOrElse(-1L))
+          .set(CurrentTxnFirstOffsetField, 
entry.currentTxnFirstOffset.orElse(-1L))
         producerEntryStruct
     }.toArray
     struct.set(ProducerEntriesField, entriesArray)
@@ -518,7 +206,7 @@ class ProducerStateManager(
     val lastTimestamp = oldestTxnLastTimestamp
     lastTimestamp > 0 && (currentTimeMs - lastTimestamp) > 
maxTransactionTimeoutMs + ProducerStateManager.LateTransactionBufferMs
   }
-  
+
   def truncateFullyAndReloadSnapshots(): Unit = {
     info("Reloading the producer state snapshots")
     truncateFullyAndStartAt(0L)
@@ -652,13 +340,11 @@ class ProducerStateManager(
   private[log] def loadProducerEntry(entry: ProducerStateEntry): Unit = {
     val producerId = entry.producerId
     producers.put(producerId, entry)
-    entry.currentTxnFirstOffset.foreach { offset =>
-      ongoingTxns.put(offset, new TxnMetadata(producerId, offset))
-    }
+    entry.currentTxnFirstOffset.ifPresent((offset: Long) => 
ongoingTxns.put(offset, new TxnMetadata(producerId, offset)))
   }
 
   private def isProducerExpired(currentTimeMs: Long, producerState: 
ProducerStateEntry): Boolean =
-    producerState.currentTxnFirstOffset.isEmpty && currentTimeMs - 
producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs
+    !producerState.currentTxnFirstOffset.isPresent && currentTimeMs - 
producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs
 
   /**
    * Expire any producer ids which have been idle longer than the configured 
maximum expiration timeout.
@@ -706,8 +392,8 @@ class ProducerStateManager(
    * Update the mapping with the given append information
    */
   def update(appendInfo: ProducerAppendInfo): Unit = {
-    if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID)
-      throw new IllegalArgumentException(s"Invalid producer id 
${appendInfo.producerId} passed to update " +
+    if (appendInfo.producerId() == RecordBatch.NO_PRODUCER_ID)
+      throw new IllegalArgumentException(s"Invalid producer id 
${appendInfo.producerId()} passed to update " +
         s"for partition $topicPartition")
 
     trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo")
@@ -720,7 +406,7 @@ class ProducerStateManager(
         producers.put(appendInfo.producerId, updatedEntry)
     }
 
-    appendInfo.startedTransactions.foreach { txn =>
+    appendInfo.startedTransactions.asScala.foreach { txn =>
       ongoingTxns.put(txn.firstOffset.messageOffset, txn)
     }
 
@@ -809,7 +495,7 @@ class ProducerStateManager(
     while (iterator.hasNext) {
       val txnEntry = iterator.next()
       val lastOffset = txnEntry.getValue.lastOffset
-      if (lastOffset.exists(_ < offset))
+      if (lastOffset.isPresent && lastOffset.getAsLong < offset)
         iterator.remove()
     }
   }
@@ -849,7 +535,7 @@ class ProducerStateManager(
       throw new IllegalArgumentException(s"Attempted to complete transaction 
$completedTxn on partition $topicPartition " +
         s"which was not started")
 
-    txnMetadata.lastOffset = Some(completedTxn.lastOffset)
+    txnMetadata.lastOffset = OptionalLong.of(completedTxn.lastOffset)
     unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata)
     updateOldestTxnTimestamp()
   }
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index c982e10a3ef..3afad862289 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName
 
 import java.io.{File, IOException}
 import java.nio.file.Files
-import java.util.Optional
+import java.util.{Optional, OptionalLong}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
@@ -42,13 +42,14 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, 
Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, 
TopicPartition, Uuid}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
-import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, 
CompletedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator}
+import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, 
BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, 
LogOffsetMetadata, LogValidator, ProducerAppendInfo}
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.record.BrokerCompressionType
 
 import scala.annotation.nowarn
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable, mutable}
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 object LogAppendInfo {
@@ -237,7 +238,7 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
  */
 @threadsafe
 class UnifiedLog(@volatile var logStartOffset: Long,
-                 private[log] val localLog: LocalLog,
+                 private val localLog: LocalLog,
                  brokerTopicStats: BrokerTopicStats,
                  val producerIdExpirationCheckIntervalMs: Int,
                  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
@@ -672,7 +673,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           .setLastSequence(state.lastSeq)
           .setLastTimestamp(state.lastTimestamp)
           .setCoordinatorEpoch(state.coordinatorEpoch)
-          .setCurrentTxnStartOffset(state.currentTxnFirstOffset.getOrElse(-1L))
+          .setCurrentTxnStartOffset(state.currentTxnFirstOffset.orElse(-1L))
       }
     }.toSeq
   }
@@ -685,8 +686,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock 
synchronized {
     producerStateManager.activeProducers.map { case (producerId, 
producerIdEntry) =>
-      val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) 
Some(producerIdEntry.lastDataOffset) else None
-      val lastRecord = LastRecord(lastDataOffset, 
producerIdEntry.producerEpoch)
+      val lastDataOffset =
+        if (producerIdEntry.lastDataOffset >= 0) 
OptionalLong.of(producerIdEntry.lastDataOffset)
+        else OptionalLong.empty()
+      val lastRecord = new LastRecord(lastDataOffset, 
producerIdEntry.producerEpoch)
       producerId -> lastRecord
     }
   }
@@ -1083,7 +1086,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         if (origin == AppendOrigin.CLIENT) {
           val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
 
-          maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { 
duplicate =>
+          maybeLastEntry.flatMap(_.findDuplicateBatch(batch).asScala).foreach 
{ duplicate =>
             return (updatedProducers, completedTxns.toList, Some(duplicate))
           }
         }
@@ -1978,7 +1981,7 @@ object UnifiedLog extends Logging {
                               origin: AppendOrigin): Option[CompletedTxn] = {
     val producerId = batch.producerId
     val appendInfo = producers.getOrElseUpdate(producerId, 
producerStateManager.prepareUpdate(producerId, origin))
-    appendInfo.append(batch, firstOffsetMetadata)
+    appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
   }
 
   /**
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 5111320cc4b..ec0fb7d2e73 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -107,7 +107,7 @@ object DumpLogSegments {
         print(s"producerId: ${entry.producerId} producerEpoch: 
${entry.producerEpoch} " +
           s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: 
${entry.currentTxnFirstOffset} " +
           s"lastTimestamp: ${entry.lastTimestamp} ")
-        entry.batchMetadata.headOption.foreach { metadata =>
+        entry.batchMetadata.asScala.headOption.foreach { metadata =>
           print(s"firstSequence: ${metadata.firstSeq} lastSequence: 
${metadata.lastSeq} " +
             s"lastOffset: ${metadata.lastOffset} offsetDelta: 
${metadata.offsetDelta} timestamp: ${metadata.timestamp}")
         }
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 85b4801a0ec..9bd5c56e7ac 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -17,6 +17,8 @@
 package kafka.log
 
 import java.io.File
+import java.util.OptionalLong
+
 import kafka.server.checkpoints.LeaderEpochCheckpoint
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.utils.TestUtils
@@ -25,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Time, Utils}
-import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.log.internals.{BatchMetadata, LogConfig, 
ProducerStateEntry}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
@@ -355,9 +357,7 @@ class LogSegmentTest {
 
     // recover again, but this time assuming the transaction from pid2 began 
on a previous segment
     stateManager = newProducerStateManager()
-    stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
-      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, 
RecordBatch.NO_TIMESTAMP)), producerEpoch,
-      0, RecordBatch.NO_TIMESTAMP, Some(75L)))
+    stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 
0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), java.util.Optional.of(new 
BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP))))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 3e5ae15d211..903d51f94f2 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.{Files, StandardOpenOption}
-import java.util.Collections
+import java.util.{Collections, Optional, OptionalLong}
 import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
@@ -29,11 +29,15 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
-import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, 
LogOffsetMetadata}
+import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, 
LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, TxnMetadata}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.Mockito.{mock, when}
 
+import java.util
+import scala.compat.java8.OptionConverters.RichOptionalGeneric
+import scala.jdk.CollectionConverters._
+
 class ProducerStateManagerTest {
   private var logDir: File = _
   private var stateManager: ProducerStateManager = _
@@ -130,7 +134,7 @@ class ProducerStateManagerTest {
     val appendInfo = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.REPLICATION)
     // Sequence number wrap around
     appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, 
time.milliseconds(),
-      new LogOffsetMetadata(2000L), 2020L, isTransactional = false)
+      new LogOffsetMetadata(2000L), 2020L, false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
@@ -182,7 +186,7 @@ class ProducerStateManagerTest {
 
     val lastEntry = maybeLastEntry.get
     assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch)
-    assertEquals(None, lastEntry.currentTxnFirstOffset)
+    assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset)
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
 
@@ -200,7 +204,7 @@ class ProducerStateManagerTest {
 
     val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
     producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, 
time.milliseconds(),
-      firstOffsetMetadata, offset, isTransactional = true)
+      firstOffsetMetadata, offset, true)
     stateManager.update(producerAppendInfo)
 
     assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset)
@@ -218,7 +222,7 @@ class ProducerStateManagerTest {
       appendInfo: ProducerAppendInfo
     ): Option[CompletedTxn] = {
       appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, 
coordinatorEpoch),
-        producerEpoch, offset, time.milliseconds())
+        producerEpoch, offset, time.milliseconds()).asScala
     }
 
     def appendData(
@@ -228,14 +232,14 @@ class ProducerStateManagerTest {
     ): Unit = {
       val count = (endOffset - startOffset).toInt
       appendInfo.appendDataBatch(producerEpoch, seq.get(), 
seq.addAndGet(count), time.milliseconds(),
-        new LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
+        new LogOffsetMetadata(startOffset), endOffset, true)
       seq.incrementAndGet()
     }
 
     // Start one transaction in a separate append
     val firstAppend = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.CLIENT)
     appendData(16L, 20L, firstAppend)
-    assertEquals(new TxnMetadata(producerId, 16L), 
firstAppend.startedTransactions.head)
+    assertTxnMetadataEquals(new TxnMetadata(producerId, 16L), 
firstAppend.startedTransactions.asScala.head)
     stateManager.update(firstAppend)
     stateManager.onHighWatermarkUpdated(21L)
     assertEquals(Some(new LogOffsetMetadata(16L)), 
stateManager.firstUnstableOffset)
@@ -255,8 +259,8 @@ class ProducerStateManagerTest {
     appendData(30L, 31L, secondAppend)
 
     assertEquals(2, secondAppend.startedTransactions.size)
-    assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), 
secondAppend.startedTransactions.head)
-    assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), 
secondAppend.startedTransactions.last)
+    assertTxnMetadataEquals(new TxnMetadata(producerId, new 
LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head)
+    assertTxnMetadataEquals(new TxnMetadata(producerId, new 
LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last)
     stateManager.update(secondAppend)
     stateManager.completeTxn(firstCompletedTxn.get)
     stateManager.completeTxn(secondCompletedTxn.get)
@@ -264,6 +268,21 @@ class ProducerStateManagerTest {
     assertEquals(Some(new LogOffsetMetadata(30L)), 
stateManager.firstUnstableOffset)
   }
 
+  def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: 
java.util.List[TxnMetadata]): Unit = {
+    val expectedIter = expected.iterator()
+    val actualIter = actual.iterator()
+    assertEquals(expected.size(), actual.size())
+    while (expectedIter.hasNext && actualIter.hasNext) {
+      assertTxnMetadataEquals(expectedIter.next(), actualIter.next())
+    }
+  }
+
+  def assertTxnMetadataEquals(expected: TxnMetadata, actual: TxnMetadata): 
Unit = {
+    assertEquals(expected.producerId, actual.producerId)
+    assertEquals(expected.firstOffset, actual.firstOffset)
+    assertEquals(expected.lastOffset, actual.lastOffset)
+  }
+
   @Test
   def testHasLateTransaction(): Unit = {
     val producerId1 = 39L
@@ -373,7 +392,7 @@ class ProducerStateManagerTest {
       )
       val firstOffsetMetadata = new LogOffsetMetadata(startOffset, 
segmentBaseOffset, 50 * relativeOffset)
       producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, 
time.milliseconds(),
-        firstOffsetMetadata, startOffset, isTransactional = true)
+        firstOffsetMetadata, startOffset, true)
       stateManager.update(producerAppendInfo)
     }
 
@@ -417,14 +436,14 @@ class ProducerStateManagerTest {
 
     val appendInfo = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.CLIENT)
     appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(),
-      new LogOffsetMetadata(15L), 20L, isTransactional = false)
+      new LogOffsetMetadata(15L), 20L, false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.CLIENT)
     nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
-      new LogOffsetMetadata(26L), 30L, isTransactional = false)
+      new LogOffsetMetadata(26L), 30L, false)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     var lastEntry = stateManager.lastEntry(producerId).get
@@ -448,30 +467,30 @@ class ProducerStateManagerTest {
 
     val appendInfo = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.CLIENT)
     appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(),
-      new LogOffsetMetadata(16L), 20L, isTransactional = true)
+      new LogOffsetMetadata(16L), 20L, true)
     var lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(5, lastEntry.lastSeq)
     assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(20L, lastEntry.lastDataOffset)
-    assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
-    assertEquals(List(new TxnMetadata(producerId, 16L)), 
appendInfo.startedTransactions)
+    assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset)
+    assertTxnMetadataEquals(java.util.Arrays.asList(new 
TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
     appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
-      new LogOffsetMetadata(26L), 30L, isTransactional = true)
+      new LogOffsetMetadata(26L), 30L, true)
     lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
     assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(30L, lastEntry.lastDataOffset)
-    assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
-    assertEquals(List(new TxnMetadata(producerId, 16L)), 
appendInfo.startedTransactions)
+    assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset)
+    assertTxnMetadataEquals(util.Arrays.asList(new TxnMetadata(producerId, 
16L)), appendInfo.startedTransactions)
 
     val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 
coordinatorEpoch)
     val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, 40L, time.milliseconds())
-    assertTrue(completedTxnOpt.isDefined)
+    assertTrue(completedTxnOpt.isPresent)
 
     val completedTxn = completedTxnOpt.get
     assertEquals(producerId, completedTxn.producerId)
@@ -487,8 +506,8 @@ class ProducerStateManagerTest {
     assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
-    assertEquals(None, lastEntry.currentTxnFirstOffset)
-    assertEquals(List(new TxnMetadata(producerId, 16L)), 
appendInfo.startedTransactions)
+    assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset)
+    assertTxnMetadataEquals(java.util.Arrays.asList(new 
TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
   }
 
   @Test
@@ -571,7 +590,7 @@ class ProducerStateManagerTest {
     assertEquals(1, loadedEntry.get.firstSeq)
     assertEquals(1, loadedEntry.get.lastDataOffset)
     assertEquals(1, loadedEntry.get.lastSeq)
-    assertEquals(Some(0), loadedEntry.get.currentTxnFirstOffset)
+    assertEquals(OptionalLong.of(0), loadedEntry.get.currentTxnFirstOffset)
 
     // entry added after recovery
     append(recoveredMapping, producerId, epoch, 2, 2L, isTransactional = true)
@@ -595,7 +614,7 @@ class ProducerStateManagerTest {
     assertEquals(1, loadedEntry.get.firstSeq)
     assertEquals(1, loadedEntry.get.lastDataOffset)
     assertEquals(1, loadedEntry.get.lastSeq)
-    assertEquals(None, loadedEntry.get.currentTxnFirstOffset)
+    assertEquals(OptionalLong.empty(), loadedEntry.get.currentTxnFirstOffset)
   }
 
   @Test
@@ -613,7 +632,7 @@ class ProducerStateManagerTest {
     val lastEntry = recoveredMapping.lastEntry(producerId)
     assertTrue(lastEntry.isDefined)
     assertEquals(appendTimestamp, lastEntry.get.lastTimestamp)
-    assertEquals(None, lastEntry.get.currentTxnFirstOffset)
+    assertEquals(OptionalLong.empty(), lastEntry.get.currentTxnFirstOffset)
   }
 
   @Test
@@ -623,7 +642,7 @@ class ProducerStateManagerTest {
     appendEndTxnMarker(stateManager, producerId, (epoch + 1).toShort, 
ControlRecordType.ABORT, offset = 1L)
 
     val lastEntry = stateManager.lastEntry(producerId).get
-    assertEquals(None, lastEntry.currentTxnFirstOffset)
+    assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset)
     assertEquals(-1, lastEntry.lastDataOffset)
     assertEquals(-1, lastEntry.firstDataOffset)
 
@@ -992,7 +1011,7 @@ class ProducerStateManagerTest {
 
     // Appending the empty control batch should not throw and a new 
transaction shouldn't be started
     append(stateManager, producerId, baseOffset, batch, origin = 
AppendOrigin.CLIENT)
-    assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+    assertEquals(OptionalLong.empty(), 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
   @Test
@@ -1101,7 +1120,7 @@ class ProducerStateManagerTest {
                                  timestamp: Long = time.milliseconds()): 
Option[CompletedTxn] = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.COORDINATOR)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, offset, timestamp)
+    val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, offset, timestamp).asScala
     mapping.update(producerAppendInfo)
     completedTxnOpt.foreach(mapping.completeTxn)
     mapping.updateMapEndOffset(offset + 1)
@@ -1129,7 +1148,7 @@ class ProducerStateManagerTest {
                      batch: RecordBatch,
                      origin: AppendOrigin): Unit = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
-    producerAppendInfo.append(batch, firstOffsetMetadataOpt = None)
+    producerAppendInfo.append(batch, Optional.empty())
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java
 
b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java
new file mode 100644
index 00000000000..668456c3518
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.record.DefaultRecordBatch;
+
+public class BatchMetadata {
+
+    public final int lastSeq;
+    public final long lastOffset;
+    public final int offsetDelta;
+    public final long timestamp;
+
+    public BatchMetadata(
+            int lastSeq,
+            long lastOffset,
+            int offsetDelta,
+            long timestamp) {
+        this.lastSeq = lastSeq;
+        this.lastOffset = lastOffset;
+        this.offsetDelta = offsetDelta;
+        this.timestamp = timestamp;
+    }
+
+    public int firstSeq() {
+        return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta);
+    }
+
+    public long firstOffset() {
+        return lastOffset - offsetDelta;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        BatchMetadata that = (BatchMetadata) o;
+
+        return lastSeq == that.lastSeq &&
+                lastOffset == that.lastOffset &&
+                offsetDelta == that.offsetDelta &&
+                timestamp == that.timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = lastSeq;
+        result = 31 * result + Long.hashCode(lastOffset);
+        result = 31 * result + offsetDelta;
+        result = 31 * result + Long.hashCode(timestamp);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "BatchMetadata(" +
+                "firstSeq=" + firstSeq() +
+                ", lastSeq=" + lastSeq +
+                ", firstOffset=" + firstOffset() +
+                ", lastOffset=" + lastOffset +
+                ", timestamp=" + timestamp +
+                ')';
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java 
b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java
new file mode 100644
index 00000000000..78568da7897
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * The last written record for a given producer. The last data offset may be 
undefined
+ * if the only log entry for a producer is a transaction marker.
+ */
+public final class LastRecord {
+    public final OptionalLong lastDataOffset;
+    public final short producerEpoch;
+
+    public LastRecord(OptionalLong lastDataOffset, short producerEpoch) {
+        Objects.requireNonNull(lastDataOffset, "lastDataOffset must be non 
null");
+        this.lastDataOffset = lastDataOffset;
+        this.producerEpoch = producerEpoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        LastRecord that = (LastRecord) o;
+
+        return producerEpoch == that.producerEpoch &&
+                lastDataOffset.equals(that.lastDataOffset);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * lastDataOffset.hashCode() + producerEpoch;
+    }
+
+    @Override
+    public String toString() {
+        return "LastRecord(" +
+                "lastDataOffset=" + lastDataOffset +
+                ", producerEpoch=" + producerEpoch +
+                ')';
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java
new file mode 100644
index 00000000000..90329a45cef
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+    private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+    private final TopicPartition topicPartition;
+    private final long producerId;
+    private final ProducerStateEntry currentEntry;
+    private final AppendOrigin origin;
+
+    private final List<TxnMetadata> transactions = new ArrayList<>();
+    private final ProducerStateEntry updatedEntry;
+
+    /**
+     * Creates a new instance with the provided parameters.
+     *
+     * @param topicPartition topic partition
+     * @param producerId     The id of the producer appending to the log
+     * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+     *                       the most recent appends made by the producer. 
Validation of the first incoming append will
+     *                       be made against the latest append in the current 
entry. New appends will replace older appends
+     *                       in the current entry so that the space overhead 
is constant.
+     * @param origin         Indicates the origin of the append which implies 
the extent of validation. For example, offset
+     *                       commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+     *                       only producer epoch validation is done. Appends 
which come through replication are not validated
+     *                       (we assume the validation has already been done) 
and appends from clients require full validation.
+     */
+    public ProducerAppendInfo(TopicPartition topicPartition,
+                              long producerId,
+                              ProducerStateEntry currentEntry,
+                              AppendOrigin origin) {
+        this.topicPartition = topicPartition;
+        this.producerId = producerId;
+        this.currentEntry = currentEntry;
+        this.origin = origin;
+
+        updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, 
currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, 
Optional.empty());
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+        checkProducerEpoch(producerEpoch, offset);
+        if (origin == AppendOrigin.CLIENT) {
+            checkSequence(producerEpoch, firstSeq, offset);
+        }
+    }
+
+    private void checkProducerEpoch(short producerEpoch, long offset) {
+        if (producerEpoch < updatedEntry.producerEpoch()) {
+            String message = "Epoch of producer " + producerId + " at offset " 
+ offset + " in " + topicPartition +
+                    " is " + producerEpoch + ", " + "which is smaller than the 
last seen epoch " + updatedEntry.producerEpoch();
+
+            if (origin == AppendOrigin.REPLICATION) {
+                log.warn(message);
+            } else {
+                // Starting from 2.7, we replaced ProducerFenced error with 
InvalidProducerEpoch in the
+                // producer send response callback to differentiate from the 
former fatal exception,
+                // letting client abort the ongoing transaction and retry.
+                throw new InvalidProducerEpochException(message);
+            }
+        }
+    }
+
+    private void checkSequence(short producerEpoch, int appendFirstSeq, long 
offset) {
+        if (producerEpoch != updatedEntry.producerEpoch()) {
+            if (appendFirstSeq != 0) {
+                if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
+                    throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
+                            "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
+                            + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
+                }
+            }
+        } else {
+            int currentLastSeq;
+            if (!updatedEntry.isEmpty())
+                currentLastSeq = updatedEntry.lastSeq();
+            else if (producerEpoch == currentEntry.producerEpoch())
+                currentLastSeq = currentEntry.lastSeq();
+            else
+                currentLastSeq = RecordBatch.NO_SEQUENCE;
+
+            // If there is no current producer epoch (possibly because all 
producer records have been deleted due to
+            // retention or the DeleteRecords API) accept writes with any 
sequence number
+            if (!(currentEntry.producerEpoch() == 
RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
+                throw new OutOfOrderSequenceException("Out of order sequence 
number for producer " + producerId + " at " +
+                        "offset " + offset + " in partition " + topicPartition 
+ ": " + appendFirstSeq +
+                        " (incoming seq. number), " + currentLastSeq + " 
(current end sequence number)");
+            }
+        }
+    }
+
+    private boolean inSequence(int lastSeq, int nextSeq) {
+        return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == 
Integer.MAX_VALUE);
+    }
+
+    public Optional<CompletedTxn> append(RecordBatch batch, 
Optional<LogOffsetMetadata> firstOffsetMetadataOpt) {
+        if (batch.isControlBatch()) {
+            Iterator<Record> recordIterator = batch.iterator();
+            if (recordIterator.hasNext()) {
+                Record record = recordIterator.next();
+                EndTransactionMarker endTxnMarker = 
EndTransactionMarker.deserialize(record);
+                return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), 
batch.baseOffset(), record.timestamp());
+            } else {
+                // An empty control batch means the entire transaction has 
been cleaned from the log, so no need to append
+                return Optional.empty();
+            }
+        } else {
+            LogOffsetMetadata firstOffsetMetadata = 
firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset()));
+            appendDataBatch(batch.producerEpoch(), batch.baseSequence(), 
batch.lastSequence(), batch.maxTimestamp(),
+                    firstOffsetMetadata, batch.lastOffset(), 
batch.isTransactional());
+            return Optional.empty();
+        }
+    }
+
+    public void appendDataBatch(short epoch,
+                                int firstSeq,
+                                int lastSeq,
+                                long lastTimestamp,
+                                LogOffsetMetadata firstOffsetMetadata,
+                                long lastOffset,
+                                boolean isTransactional) {
+        long firstOffset = firstOffsetMetadata.messageOffset;
+        maybeValidateDataBatch(epoch, firstSeq, firstOffset);
+        updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - 
firstOffset), lastTimestamp);
+
+        OptionalLong currentTxnFirstOffset = 
updatedEntry.currentTxnFirstOffset;
+        if (currentTxnFirstOffset.isPresent() && !isTransactional) {
+            // Received a non-transactional message while a transaction is 
active
+            throw new InvalidTxnStateException("Expected transactional write 
from producer " + producerId + " at " +
+                    "offset " + firstOffsetMetadata + " in partition " + 
topicPartition);
+        } else if (!currentTxnFirstOffset.isPresent() && isTransactional) {
+            // Began a new transaction
+            updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset);
+            transactions.add(new TxnMetadata(producerId, firstOffsetMetadata));
+        }
+    }
+
+    private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long 
offset) {
+        if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) {
+            if (origin == AppendOrigin.REPLICATION) {
+                log.info("Detected invalid coordinator epoch for producerId {} 
at offset {} in partition {}: {} is older than previously known coordinator 
epoch {}",
+                        producerId, offset, topicPartition, 
endTxnMarker.coordinatorEpoch(), updatedEntry.coordinatorEpoch);
+            } else {
+                throw new TransactionCoordinatorFencedException("Invalid 
coordinator epoch for producerId " + producerId + " at " +
+                        "offset " + offset + " in partition " + topicPartition 
+ ": " + endTxnMarker.coordinatorEpoch() +
+                        " (zombie), " + updatedEntry.coordinatorEpoch + " 
(current)");
+            }
+        }
+    }
+
+    public Optional<CompletedTxn> appendEndTxnMarker(
+            EndTransactionMarker endTxnMarker,
+            short producerEpoch,
+            long offset,
+            long timestamp) {
+        checkProducerEpoch(producerEpoch, offset);
+        checkCoordinatorEpoch(endTxnMarker, offset);
+
+        // Only emit the `CompletedTxn` for non-empty transactions. A 
transaction marker
+        // without any associated data will not have any impact on the last 
stable offset
+        // and would not need to be reflected in the transaction index.
+        Optional<CompletedTxn> completedTxn = 
updatedEntry.currentTxnFirstOffset.isPresent() ?
+                Optional.of(new CompletedTxn(producerId, 
updatedEntry.currentTxnFirstOffset.getAsLong(), offset,
+                        endTxnMarker.controlType() == ControlRecordType.ABORT))
+                : Optional.empty();
+
+        updatedEntry.maybeUpdateProducerEpoch(producerEpoch);
+        updatedEntry.currentTxnFirstOffset = OptionalLong.empty();
+        updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch();
+        updatedEntry.lastTimestamp = timestamp;
+
+        return completedTxn;
+    }
+
+    public ProducerStateEntry toEntry() {
+        return updatedEntry;
+    }
+
+    public List<TxnMetadata> startedTransactions() {
+        return Collections.unmodifiableList(transactions);
+    }
+
+    @Override
+    public String toString() {
+        return "ProducerAppendInfo(" +
+                "producerId=" + producerId +
+                ", producerEpoch=" + updatedEntry.producerEpoch() +
+                ", firstSequence=" + updatedEntry.firstSeq() +
+                ", lastSequence=" + updatedEntry.lastSeq() +
+                ", currentTxnFirstOffset=" + 
updatedEntry.currentTxnFirstOffset +
+                ", coordinatorEpoch=" + updatedEntry.coordinatorEpoch +
+                ", lastTimestamp=" + updatedEntry.lastTimestamp +
+                ", startedTransactions=" + transactions +
+                ')';
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java
 
b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java
new file mode 100644
index 00000000000..bbb3e9f9041
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Stream;
+
+/**
+ * This class represents the state of a specific producer-id.
+ * It contains batchMetadata queue which is ordered such that the batch with 
the lowest sequence is at the head of the
+ * queue while the batch with the highest sequence is at the tail of the 
queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN}
+ * elements in the queue. When the queue is at capacity, we remove the first 
element to make space for the incoming batch.
+ */
+public class ProducerStateEntry {
+    public static final int NUM_BATCHES_TO_RETAIN = 5;
+
+    public int coordinatorEpoch;
+    public long lastTimestamp;
+    public OptionalLong currentTxnFirstOffset;
+
+    private final long producerId;
+    private final Deque<BatchMetadata> batchMetadata = new ArrayDeque<>();
+    private short producerEpoch;
+
+    public static ProducerStateEntry empty(long producerId) {
+        return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty());
+    }
+
+    public ProducerStateEntry(long producerId) {
+        this(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, 
RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
+    }
+
+    public ProducerStateEntry(long producerId, short producerEpoch, int 
coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset, 
Optional<BatchMetadata> firstBatchMetadata) {
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.coordinatorEpoch = coordinatorEpoch;
+        this.lastTimestamp = lastTimestamp;
+        this.currentTxnFirstOffset = currentTxnFirstOffset;
+        firstBatchMetadata.ifPresent(batchMetadata::add);
+    }
+
+    public int firstSeq() {
+        return isEmpty() ? RecordBatch.NO_SEQUENCE : 
batchMetadata.getFirst().firstSeq();
+    }
+
+    public int lastSeq() {
+        return isEmpty() ? RecordBatch.NO_SEQUENCE : 
batchMetadata.getLast().lastSeq;
+    }
+
+    public long firstDataOffset() {
+        return isEmpty() ? -1L : batchMetadata.getFirst().firstOffset();
+    }
+
+    public long lastDataOffset() {
+        return isEmpty() ? -1L : batchMetadata.getLast().lastOffset;
+    }
+
+    public int lastOffsetDelta() {
+        return isEmpty() ? 0 : batchMetadata.getLast().offsetDelta;
+    }
+
+    public boolean isEmpty() {
+        return batchMetadata.isEmpty();
+    }
+
+    public void addBatch(short producerEpoch, int lastSeq, long lastOffset, 
int offsetDelta, long timestamp) {
+        maybeUpdateProducerEpoch(producerEpoch);
+        addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, 
timestamp));
+        this.lastTimestamp = timestamp;
+    }
+
+    public boolean maybeUpdateProducerEpoch(short producerEpoch) {
+        if (this.producerEpoch != producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void addBatchMetadata(BatchMetadata batch) {
+        if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) 
batchMetadata.removeFirst();
+        batchMetadata.add(batch);
+    }
+
+    public void update(ProducerStateEntry nextEntry) {
+        maybeUpdateProducerEpoch(nextEntry.producerEpoch);
+        while (!nextEntry.batchMetadata.isEmpty()) 
addBatchMetadata(nextEntry.batchMetadata.removeFirst());
+        this.coordinatorEpoch = nextEntry.coordinatorEpoch;
+        this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset;
+        this.lastTimestamp = nextEntry.lastTimestamp;
+    }
+
+    public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
+        if (batch.producerEpoch() != producerEpoch) return Optional.empty();
+        else return batchWithSequenceRange(batch.baseSequence(), 
batch.lastSequence());
+    }
+
+    // Return the batch metadata of the cached batch having the exact sequence 
range, if any.
+    Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) {
+        Stream<BatchMetadata> duplicate = 
batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && 
lastSeq == metadata.lastSeq);
+        return duplicate.findFirst();
+    }
+
+    public Collection<BatchMetadata> batchMetadata() {
+        return Collections.unmodifiableCollection(batchMetadata);
+    }
+
+    public short producerEpoch() {
+        return producerEpoch;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    @Override
+    public String toString() {
+        return "ProducerStateEntry(" +
+                "producerId=" + producerId +
+                ", producerEpoch=" + producerEpoch +
+                ", currentTxnFirstOffset=" + currentTxnFirstOffset +
+                ", coordinatorEpoch=" + coordinatorEpoch +
+                ", lastTimestamp=" + lastTimestamp +
+                ", batchMetadata=" + batchMetadata +
+                ')';
+    }
+}
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java 
b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java
new file mode 100644
index 00000000000..76fcb4c528f
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public final class TxnMetadata {
+    public final long producerId;
+    public final LogOffsetMetadata firstOffset;
+    public OptionalLong lastOffset;
+
+    public TxnMetadata(long producerId,
+                       LogOffsetMetadata firstOffset,
+                       OptionalLong lastOffset) {
+        Objects.requireNonNull(firstOffset, "firstOffset must be non null");
+        this.producerId = producerId;
+        this.firstOffset = firstOffset;
+        this.lastOffset = lastOffset;
+    }
+    public TxnMetadata(long producerId, long firstOffset) {
+        this(producerId, new LogOffsetMetadata(firstOffset));
+    }
+
+    public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) {
+        this(producerId, firstOffset, OptionalLong.empty());
+    }
+
+    @Override
+    public String toString() {
+        return "TxnMetadata(" +
+                "producerId=" + producerId +
+                ", firstOffset=" + firstOffset +
+                ", lastOffset=" + lastOffset +
+                ')';
+    }
+}

Reply via email to