This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c925e9f334 KAFKA-15526: Simplify the LogAppendInfo class (#14470)
2c925e9f334 is described below

commit 2c925e9f334b0aa6c52df343c2394f00f86ac2ff
Author: David Mao <[email protected]>
AuthorDate: Tue Oct 3 17:32:44 2023 -0700

    KAFKA-15526: Simplify the LogAppendInfo class (#14470)
    
    The LogAppendInfo class is a bit bloated in terms of class fields. That's 
because it is used as an umbrella class for both leader log appends and 
follower log appends and needs to carry fields for both. This makes the 
constructor for the class a bit cludgy to use. It also ends up being a bit 
confusing when fields are important and when they aren't. I noticed there were 
a few fields that didn't seem necessary.
    
    Below is a description of changes:
    
    firstOffset is a LogOffsetMetadata but there are no readers of the field 
that use anything but the messageOffset field - simplified to a long.
    LogAppendInfo.errorMessage is only set in one context - when calling 
LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo. When we use this 
constructor, we pass up the original exception in LogAppendResult anyway, so 
the field is redundant with the LogAppendResult.exception field. This allows us 
to simplify the handling in KAFKA-15459: Convert coordinator retriable errors 
to a known producer response error #14378 since there are no custom error 
messages we just return whatever is in t [...]
    We only use targetCompressionType when constructing the LogValidator - just 
inline the call instead of including it in the LogAppendInfo.
    offsetsMonotonic is only used when not assigning offsets to throw an 
exception - just throw the exception instead of setting a field to throw later.
    shallowCount is only there to determine whether there are any messages in 
the append. Instead, we can just check validBytes which is incremented with a 
non-zero value every time we increment shallowCount.
    
    Reviewers: Justine Olshan <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 55 ++++++-------
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  6 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 92 +++++++++++-----------
 .../src/test/scala/other/kafka/StressTestLog.scala |  2 +-
 .../log/AbstractLogCleanerIntegrationTest.scala    |  2 +-
 .../LogCleanerParameterizedIntegrationTest.scala   |  6 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 10 +--
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 34 ++++----
 .../unit/kafka/server/MockFetcherThread.scala      |  9 +--
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  9 +--
 .../kafka/storage/internals/log/LogAppendInfo.java | 90 +++++----------------
 12 files changed, 127 insertions(+), 190 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 6c680135ae1..601d444d0e6 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -768,10 +768,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // This will ensure that any log data can be recovered with the correct 
topic ID in the case of failure.
     maybeFlushMetadataFile()
 
-    val appendInfo = analyzeAndValidateRecords(records, origin, 
ignoreRecordSize, leaderEpoch)
+    val appendInfo = analyzeAndValidateRecords(records, origin, 
ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)
 
     // return if we have no valid messages or if this is a duplicate of the 
last appended entry
-    if (appendInfo.shallowCount == 0) appendInfo
+    if (appendInfo.validBytes <= 0) appendInfo
     else {
 
       // trim any invalid bytes or partial messages before appending it to the 
on-disk log
@@ -784,13 +784,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (validateAndAssignOffsets) {
             // assign offsets to the message set
             val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
-            appendInfo.setFirstOffset(Optional.of(new 
LogOffsetMetadata(offset.value)))
+            appendInfo.setFirstOffset(offset.value)
             val validateAndOffsetAssignResult = try {
+              val targetCompression = 
BrokerCompressionType.forName(config.compressionType).targetCompressionType(appendInfo.sourceCompression)
               val validator = new LogValidator(validRecords,
                 topicPartition,
                 time,
                 appendInfo.sourceCompression,
-                appendInfo.targetCompression,
+                targetCompression,
                 config.compact,
                 config.recordVersion.value,
                 config.messageTimestampType,
@@ -835,18 +836,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
             }
           } else {
             // we are taking the offsets we are given
-            if (!appendInfo.offsetsMonotonic)
-              throw new OffsetsOutOfOrderException(s"Out of order offsets 
found in append to $topicPartition: " +
-                records.records.asScala.map(_.offset))
-
             if (appendInfo.firstOrLastOffsetOfFirstBatch < 
localLog.logEndOffset) {
               // we may still be able to recover if the log is empty
               // one example: fetching from log start offset on the leader 
which is not batch aligned,
               // which may happen as a result of AdminClient#deleteRecords()
-              val firstOffset = appendInfo.firstOffset.map[Long](x => 
x.messageOffset)
-                .orElse(records.batches.iterator().next().baseOffset())
+              val hasFirstOffset = appendInfo.firstOffset != 
UnifiedLog.UnknownOffset
+              val firstOffset = if (hasFirstOffset) appendInfo.firstOffset 
else records.batches.iterator().next().baseOffset()
 
-              val firstOrLast = if (appendInfo.firstOffset.isPresent) "First 
offset" else "Last offset of the first batch"
+              val firstOrLast = if (hasFirstOffset) "First offset" else "Last 
offset of the first batch"
               throw new UnexpectedAppendOffsetException(
                 s"Unexpected offset in append to $topicPartition. $firstOrLast 
" +
                   s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than 
the next offset ${localLog.logEndOffset}. " +
@@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
           maybeDuplicate match {
             case Some(duplicate) =>
-              appendInfo.setFirstOffset(Optional.of(new 
LogOffsetMetadata(duplicate.firstOffset)))
+              appendInfo.setFirstOffset(duplicate.firstOffset)
               appendInfo.setLastOffset(duplicate.lastOffset)
               appendInfo.setLogAppendTime(duplicate.timestamp)
               appendInfo.setLogStartOffset(logStartOffset)
             case None =>
-              // Before appending update the first offset metadata to include 
segment information
-              appendInfo.setFirstOffset(appendInfo.firstOffset.map { 
offsetMetadata =>
-                new LogOffsetMetadata(offsetMetadata.messageOffset, 
segment.baseOffset, segment.size)
-              })
-
               // Append the records, and increment the local log end offset 
immediately after the append because a
               // write to the transaction index below may fail and we want to 
ensure that the offsets
               // of future appends still grow monotonically. The resulting 
transaction index inconsistency
@@ -1097,7 +1089,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * <ol>
    * <li> each message matches its CRC
    * <li> each message size is valid (if ignoreRecordSize is false)
-   * <li> that the sequence numbers of the incoming record batches are 
consistent with the existing state and with each other.
+   * <li> that the sequence numbers of the incoming record batches are 
consistent with the existing state and with each other
+   * <li> that the offsets are monotonically increasing (if 
requireOffsetsMonotonic is true)
    * </ol>
    *
    * Also compute the following quantities:
@@ -1113,10 +1106,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def analyzeAndValidateRecords(records: MemoryRecords,
                                         origin: AppendOrigin,
                                         ignoreRecordSize: Boolean,
+                                        requireOffsetsMonotonic: Boolean,
                                         leaderEpoch: Int): LogAppendInfo = {
-    var shallowMessageCount = 0
     var validBytesCount = 0
-    var firstOffset: Optional[LogOffsetMetadata] = Optional.empty()
+    var firstOffset = UnifiedLog.UnknownOffset
     var lastOffset = -1L
     var lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH
     var sourceCompression = CompressionType.NONE
@@ -1143,7 +1136,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       // Also indicate whether we have the accurate first offset or not
       if (!readFirstMessage) {
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-          firstOffset = Optional.of(new LogOffsetMetadata(batch.baseOffset))
+          firstOffset = batch.baseOffset
         lastOffsetOfFirstBatch = batch.lastOffset
         readFirstMessage = true
       }
@@ -1176,7 +1169,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         offsetOfMaxTimestamp = lastOffset
       }
 
-      shallowMessageCount += 1
       validBytesCount += batchSize
 
       val batchCompression = CompressionType.forId(batch.compressionType.id)
@@ -1184,17 +1176,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         sourceCompression = batchCompression
     }
 
-    // Apply broker-side compression if any
-    val targetCompression = 
BrokerCompressionType.forName(config.compressionType).targetCompressionType(sourceCompression)
+    if (requireOffsetsMonotonic && !monotonic)
+        throw new OffsetsOutOfOrderException(s"Out of order offsets found in 
append to $topicPartition: " +
+          records.records.asScala.map(_.offset))
+
     val lastLeaderEpochOpt: OptionalInt = if (lastLeaderEpoch != 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
       OptionalInt.of(lastLeaderEpoch)
     else
       OptionalInt.empty()
 
     new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, 
maxTimestamp, offsetOfMaxTimestamp,
-      RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, 
sourceCompression, targetCompression,
-      shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch, 
Collections.emptyList[RecordError], null,
-      LeaderHwChange.NONE)
+      RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, 
sourceCompression,
+      validBytesCount, lastOffsetOfFirstBatch, 
Collections.emptyList[RecordError], LeaderHwChange.NONE)
   }
 
   /**
@@ -1588,10 +1581,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         Note that this is only required for pre-V2 message formats because 
these do not store the first message offset
         in the header.
       */
-      val rollOffset = appendInfo
-        .firstOffset
-        .map[Long](_.messageOffset)
-        .orElse(maxOffsetInMessages - Integer.MAX_VALUE)
+      val rollOffset = if (appendInfo.firstOffset == UnifiedLog.UnknownOffset)
+        maxOffsetInMessages - Integer.MAX_VALUE
+      else
+        appendInfo.firstOffset
 
       roll(Some(rollOffset))
     } else {
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 9703fbc444c..e41b6500665 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -96,10 +96,10 @@ final class KafkaMetadataLog private (
   }
 
   private def handleAndConvertLogAppendInfo(appendInfo: 
internals.log.LogAppendInfo): LogAppendInfo = {
-    if (appendInfo.firstOffset.isPresent())
-      new LogAppendInfo(appendInfo.firstOffset.get().messageOffset, 
appendInfo.lastOffset)
+    if (appendInfo.firstOffset != UnifiedLog.UnknownOffset)
+      new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset)
     else
-      throw new KafkaException(s"Append failed unexpectedly: 
${appendInfo.errorMessage}")
+      throw new KafkaException(s"Append failed unexpectedly")
   }
 
   override def lastFetchedEpoch: Int = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 07e7418776a..eb140eb31fd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._
 /*
  * Result metadata of a log append operation on the log
  */
-case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = 
None) {
+case class LogAppendResult(info: LogAppendInfo,
+                           exception: Option[Throwable],
+                           hasCustomErrorMessage: Boolean) {
   def error: Errors = exception match {
     case None => Errors.NONE
     case Some(e) => Errors.forException(e)
   }
+
+  def errorMessage: String = {
+    exception match {
+      case Some(e) if hasCustomErrorMessage => e.getMessage
+      case _ => null
+    }
+  }
 }
 
 case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, 
exception: Option[Throwable] = None) {
@@ -739,67 +748,54 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
       def appendEntries(allEntries: Map[TopicPartition, 
MemoryRecords])(unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
-        val verifiedEntries = 
+        val verifiedEntries =
           if (unverifiedEntries.isEmpty)
-            allEntries 
+            allEntries
           else
             allEntries.filter { case (tp, _) =>
               !unverifiedEntries.contains(tp)
             }
-        
+
         val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
           origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
         debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
 
-        def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
-                                useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
-          appendResult.map { case (topicPartition, result) =>
-            topicPartition -> ProducePartitionStatus(
-              result.info.lastOffset + 1, // required offset
-              new PartitionResponse(
-                result.error,
-                result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
-                result.info.lastOffset,
-                result.info.logAppendTime,
-                result.info.logStartOffset,
-                result.info.recordErrors,
-                if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
-              )
-            ) // response status
-          }
-        }
-        
-        val unverifiedResults = unverifiedEntries.map {
+        val errorResults = (unverifiedEntries ++ errorsPerPartition).map {
           case (topicPartition, error) =>
-            val finalException =
+            // translate transaction coordinator errors to known producer 
response errors
+            val customException =
               error match {
-                case Errors.INVALID_TXN_STATE => error.exception("Partition 
was not added to the transaction")
+                case Errors.INVALID_TXN_STATE => 
Some(error.exception("Partition was not added to the transaction"))
                 case Errors.CONCURRENT_TRANSACTIONS |
                      Errors.COORDINATOR_LOAD_IN_PROGRESS |
                      Errors.COORDINATOR_NOT_AVAILABLE |
-                     Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
-                         s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${error.toString}")
-                case _ => error.exception()
+                     Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
+                         s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${error.toString}"))
+                case _ => None
             }
             topicPartition -> LogAppendResult(
               LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-              Some(finalException)
+              Some(customException.getOrElse(error.exception)),
+              hasCustomErrorMessage = customException.isDefined
             )
         }
 
-        val errorResults = errorsPerPartition.map {
-          case (topicPartition, error) =>
-            topicPartition -> LogAppendResult(
-              LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-              Some(error.exception())
+        val allResults = localProduceResults ++ errorResults
+        val produceStatus = allResults.map { case (topicPartition, result) =>
+          topicPartition -> ProducePartitionStatus(
+            result.info.lastOffset + 1, // required offset
+            new PartitionResponse(
+              result.error,
+              result.info.firstOffset,
+              result.info.lastOffset,
+              result.info.logAppendTime,
+              result.info.logStartOffset,
+              result.info.recordErrors,
+              result.errorMessage
             )
+          ) // response status
         }
 
-        val produceStatus = Set((localProduceResults, false), 
(unverifiedResults, true), (errorResults, false)).flatMap {
-          case (results, useCustomError) => produceStatusResult(results, 
useCustomError)
-        }.toMap
-        val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
-
         actionQueue.add {
           () => allResults.foreach { case (topicPartition, result) =>
             val requestKey = TopicPartitionOperationKey(topicPartition)
@@ -859,7 +855,7 @@ class ReplicaManager(val config: KafkaConfig,
       val responseStatus = entriesPerPartition.map { case (topicPartition, _) 
=>
         topicPartition -> new PartitionResponse(
           Errors.INVALID_REQUIRED_ACKS,
-          
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
           RecordBatch.NO_TIMESTAMP,
           LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
         )
@@ -1182,7 +1178,8 @@ class ReplicaManager(val config: KafkaConfig,
       if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
         (topicPartition, LogAppendResult(
           LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-          Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicPartition.topic}"))))
+          Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicPartition.topic}")),
+          hasCustomErrorMessage = false))
       } else {
         try {
           val partition = getPartitionOrException(topicPartition)
@@ -1197,9 +1194,9 @@ class ReplicaManager(val config: KafkaConfig,
 
           if (traceEnabled)
             trace(s"${records.sizeInBytes} written to log $topicPartition 
beginning at offset " +
-              s"${info.firstOffset.orElse(new LogOffsetMetadata(-1))} and 
ending at offset ${info.lastOffset}")
+              s"${info.firstOffset} and ending at offset ${info.lastOffset}")
 
-          (topicPartition, LogAppendResult(info))
+          (topicPartition, LogAppendResult(info, exception = None, 
hasCustomErrorMessage = false))
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known 
exceptions
           // it is supposed to indicate un-expected failures of a broker in 
handling a produce request
@@ -1209,15 +1206,16 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
                    _: KafkaStorageException) =>
-            (topicPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e)))
+            (topicPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
           case rve: RecordValidationException =>
             val logStartOffset = processFailedRecord(topicPartition, 
rve.invalidException)
             val recordErrors = rve.recordErrors
-            (topicPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
-              logStartOffset, recordErrors, rve.invalidException.getMessage), 
Some(rve.invalidException)))
+            (topicPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
 recordErrors),
+              Some(rve.invalidException), hasCustomErrorMessage = true))
           case t: Throwable =>
             val logStartOffset = processFailedRecord(topicPartition, t)
-            (topicPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
 Some(t)))
+            (topicPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
+              Some(t), hasCustomErrorMessage = false))
         }
       }
     }
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala 
b/core/src/test/scala/other/kafka/StressTestLog.scala
index 9dda3fde499..c67c543651f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -124,7 +124,7 @@ object StressTestLog {
   class WriterThread(val log: UnifiedLog) extends WorkerThread with 
LogProgress {
     override def work(): Unit = {
       val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
-      require((!logAppendInfo.firstOffset.isPresent || 
logAppendInfo.firstOffset.get().messageOffset == currentOffset)
+      require((logAppendInfo.firstOffset == -1 || logAppendInfo.firstOffset == 
currentOffset)
         && logAppendInfo.lastOffset == currentOffset)
       currentOffset += 1
       if (currentOffset % 1000 == 0)
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index b52f4ec1e69..347cb5e6ecb 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -150,7 +150,7 @@ abstract class AbstractLogCleanerIntegrationTest {
       // move LSO forward to increase compaction bound
       log.updateHighWatermark(log.logEndOffset)
       incCounter()
-      (key, value, appendInfo.firstOffset.get.messageOffset)
+      (key, value, appendInfo.firstOffset)
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 132a77ff97b..6946d8199a5 100755
--- 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -70,7 +70,7 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
     // move LSO forward to increase compaction bound
     log.updateHighWatermark(log.logEndOffset)
-    val largeMessageOffset = appendInfo.firstOffset.get.messageOffset
+    val largeMessageOffset = appendInfo.firstOffset
 
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, 
numDups = 3, log = log, codec = codec)
     val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, 
largeMessageOffset)) ++ dups
@@ -172,7 +172,7 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
       val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
       // move LSO forward to increase compaction bound
       log.updateHighWatermark(log.logEndOffset)
-      val largeMessageOffset = 
appendInfo.firstOffset.map[Long](_.messageOffset).get
+      val largeMessageOffset = appendInfo.firstOffset
 
       // also add some messages with version 1 and version 2 to check that we 
handle mixed format versions correctly
       props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, 
IBP_0_11_0_IV0.version)
@@ -320,7 +320,7 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, 
codec, records: _*), leaderEpoch = 0)
     // move LSO forward to increase compaction bound
     log.updateHighWatermark(log.logEndOffset)
-    val offsets = appendInfo.firstOffset.get.messageOffset to 
appendInfo.lastOffset
+    val offsets = appendInfo.firstOffset to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 05aaa8ab062..7a221ad3310 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -295,17 +295,17 @@ class LogCleanerTest {
 
     // check duplicate append from producer 1
     var logAppendInfo = appendIdempotentAsLeader(log, pid1, 
producerEpoch)(Seq(1, 2, 3))
-    assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
+    assertEquals(0L, logAppendInfo.firstOffset)
     assertEquals(2L, logAppendInfo.lastOffset)
 
     // check duplicate append from producer 3
     logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 
4))
-    assertEquals(6L, logAppendInfo.firstOffset.get.messageOffset)
+    assertEquals(6L, logAppendInfo.firstOffset)
     assertEquals(7L, logAppendInfo.lastOffset)
 
     // check duplicate append from producer 2
     logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 
1, 4))
-    assertEquals(3L, logAppendInfo.firstOffset.get.messageOffset)
+    assertEquals(3L, logAppendInfo.firstOffset)
     assertEquals(5L, logAppendInfo.lastOffset)
 
     // do one more append and a round of cleaning to force another deletion 
from producer 1's batch
@@ -321,7 +321,7 @@ class LogCleanerTest {
 
     // duplicate append from producer1 should still be fine
     logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 
2, 3))
-    assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
+    assertEquals(0L, logAppendInfo.firstOffset)
     assertEquals(2L, logAppendInfo.lastOffset)
   }
 
@@ -2048,7 +2048,7 @@ class LogCleanerTest {
 
   private def writeToLog(log: UnifiedLog, seq: Iterable[(Int, Int)]): 
Iterable[Long] = {
     for ((key, value) <- seq)
-      yield log.appendAsLeader(record(key, value), leaderEpoch = 
0).firstOffset.get.messageOffset
+      yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
   }
 
   private def key(id: Long) = ByteBuffer.wrap(id.toString.getBytes)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index ee70a518910..b4ea0f95919 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -320,7 +320,7 @@ class LogManagerTest {
     for (_ <- 0 until numMessages) {
       val set = TestUtils.singletonRecords("test".getBytes())
       val info = log.appendAsLeader(set, leaderEpoch = 0)
-      offset = info.firstOffset.get.messageOffset
+      offset = info.firstOffset
     }
 
     log.updateHighWatermark(log.logEndOffset)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index f467345f69f..16d8c1e7adb 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -160,17 +160,17 @@ class UnifiedLogTest {
     val records = TestUtils.records(simpleRecords)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(new LogOffsetMetadata(0, 0, 0), 
firstAppendInfo.firstOffset.get)
+    assertEquals(0, firstAppendInfo.firstOffset)
 
     val secondAppendInfo = log.appendAsLeader(
       TestUtils.records(simpleRecords),
       leaderEpoch = 0
     )
-    assertEquals(new LogOffsetMetadata(simpleRecords.size, 0, 
records.sizeInBytes), secondAppendInfo.firstOffset.get)
+    assertEquals(simpleRecords.size, secondAppendInfo.firstOffset)
 
     log.roll()
     val afterRollAppendInfo =  
log.appendAsLeader(TestUtils.records(simpleRecords), leaderEpoch = 0)
-    assertEquals(new LogOffsetMetadata(simpleRecords.size * 2, 
simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
+    assertEquals(simpleRecords.size * 2, afterRollAppendInfo.firstOffset)
   }
 
   @Test
@@ -1260,7 +1260,7 @@ class UnifiedLogTest {
     ), producerId = pid, producerEpoch = epoch, sequence = seq)
     val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 
0)
     assertEquals(
-      multiEntryAppendInfo.lastOffset - 
multiEntryAppendInfo.firstOffset.get.messageOffset + 1,
+      multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1,
       3,
       "should have appended 3 entries"
     )
@@ -1268,8 +1268,8 @@ class UnifiedLogTest {
     // Append a Duplicate of the tail, when the entry at the tail has multiple 
records.
     val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, 
leaderEpoch = 0)
     assertEquals(
-      multiEntryAppendInfo.firstOffset.get.messageOffset,
-      dupMultiEntryAppendInfo.firstOffset.get.messageOffset,
+      multiEntryAppendInfo.firstOffset,
+      dupMultiEntryAppendInfo.firstOffset,
       "Somehow appended a duplicate entry with multiple log records to the 
tail"
     )
     assertEquals(multiEntryAppendInfo.lastOffset, 
dupMultiEntryAppendInfo.lastOffset,
@@ -1305,8 +1305,8 @@ class UnifiedLogTest {
     val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 
leaderEpoch = 0)
     val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 
leaderEpoch = 0)
     assertEquals(
-      origAppendInfo.firstOffset.get.messageOffset,
-      newAppendInfo.firstOffset.get.messageOffset,
+      origAppendInfo.firstOffset,
+      newAppendInfo.firstOffset,
       "Inserted a duplicate records into the log"
     )
     assertEquals(origAppendInfo.lastOffset, newAppendInfo.lastOffset,
@@ -1786,7 +1786,7 @@ class UnifiedLogTest {
         log.appendAsLeader(
           TestUtils.singletonRecords(value = "hello".getBytes, timestamp = 
mockTime.milliseconds),
           leaderEpoch = 0
-        ).firstOffset.get.messageOffset,
+        ).firstOffset,
         "Should still be able to append and should get the logEndOffset 
assigned to the new append")
 
       // cleanup the log
@@ -2940,7 +2940,7 @@ class UnifiedLogTest {
       new SimpleRecord("baz".getBytes))
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
 
     // add more transactional records
     seq += 3
@@ -2948,13 +2948,13 @@ class UnifiedLogTest {
       new SimpleRecord("blah".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
 
     // now transaction is committed
     val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, 
epoch, ControlRecordType.COMMIT, mockTime.milliseconds())
 
     // first unstable offset is not updated until the high watermark is 
advanced
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
     log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
@@ -3347,7 +3347,7 @@ class UnifiedLogTest {
       new SimpleRecord("a".getBytes),
       new SimpleRecord("b".getBytes),
       new SimpleRecord("c".getBytes)), leaderEpoch = 0)
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
 
     // mix in some non-transactional data
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
@@ -3362,14 +3362,14 @@ class UnifiedLogTest {
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
 
     // now first producer's transaction is aborted
     val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, 
epoch, ControlRecordType.ABORT, mockTime.milliseconds())
     log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
 
     // LSO should now point to one less than the first offset of the second 
transaction
-    assertEquals(secondAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(secondAppendInfo.firstOffset), log.firstUnstableOffset)
 
     // commit the second transaction
     val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, 
epoch, ControlRecordType.COMMIT, mockTime.milliseconds())
@@ -3394,7 +3394,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
 
     // this write should spill to the second segment
     seq = 3
@@ -3402,7 +3402,7 @@ class UnifiedLogTest {
       new SimpleRecord("d".getBytes),
       new SimpleRecord("e".getBytes),
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
-    assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), 
log.firstUnstableOffset.asJava)
+    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
     assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
 
     // now abort the transaction
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala 
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 4b4da0d12f3..94908caef5d 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -23,10 +23,10 @@ import org.apache.kafka.common.requests.FetchResponse
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.storage.internals.log.{LogAppendInfo, 
LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.LogAppendInfo
 import org.junit.jupiter.api.Assertions._
 
-import java.util.{Optional, OptionalInt}
+import java.util.OptionalInt
 import scala.collection.{Map, Set, mutable}
 import scala.jdk.CollectionConverters._
 
@@ -102,7 +102,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
     state.logStartOffset = partitionData.logStartOffset
     state.highWatermark = partitionData.highWatermark
 
-    Some(new LogAppendInfo(Optional.of(new LogOffsetMetadata(fetchOffset)),
+    Some(new LogAppendInfo(fetchOffset,
       lastOffset,
       lastEpoch,
       maxTimestamp,
@@ -111,10 +111,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
       state.logStartOffset,
       RecordConversionStats.EMPTY,
       CompressionType.NONE,
-      CompressionType.NONE,
-      batches.size,
       FetchResponse.recordsSize(partitionData),
-      true,
       batches.headOption.map(_.lastOffset).getOrElse(-1)))
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 6a0feaa6456..7258999cd29 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.{FetchRequest, 
FetchResponse, UpdateMeta
 import org.apache.kafka.common.utils.{LogContext, SystemTime}
 import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
-import org.apache.kafka.storage.internals.log.{LogAppendInfo, 
LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{LogAppendInfo}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -765,7 +765,7 @@ class ReplicaFetcherThreadTest {
 
     when(partition.localLogOrException).thenReturn(log)
     when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
any())).thenReturn(Some(new LogAppendInfo(
-      Optional.empty[LogOffsetMetadata],
+      -1,
       0,
       OptionalInt.empty,
       RecordBatch.NO_TIMESTAMP,
@@ -774,10 +774,7 @@ class ReplicaFetcherThreadTest {
       -1L,
       RecordConversionStats.EMPTY,
       CompressionType.NONE,
-      CompressionType.NONE,
-      -1,
-      0, // No records.
-      false,
+      -1, // No records.
       -1L
     )))
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
index 6f645456e7f..0ecaaabe223 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.common.requests.ProduceResponse.RecordError;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.OptionalInt;
 
 /**
@@ -31,12 +30,11 @@ import java.util.OptionalInt;
  */
 public class LogAppendInfo {
 
-    public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new 
LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
+    public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new 
LogAppendInfo(-1, -1, OptionalInt.empty(),
             RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
-            RecordConversionStats.EMPTY, CompressionType.NONE, 
CompressionType.NONE, -1, -1,
-            false, -1L);
+            RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L);
 
-    private Optional<LogOffsetMetadata> firstOffset;
+    private long firstOffset;
     private long lastOffset;
     private long maxTimestamp;
     private long offsetOfMaxTimestamp;
@@ -46,21 +44,16 @@ public class LogAppendInfo {
 
     private final OptionalInt lastLeaderEpoch;
     private final CompressionType sourceCompression;
-    private final CompressionType targetCompression;
-    private final int shallowCount;
     private final int validBytes;
-    private final boolean offsetsMonotonic;
     private final long lastOffsetOfFirstBatch;
     private final List<RecordError> recordErrors;
-    private final String errorMessage;
     private final LeaderHwChange leaderHwChange;
 
     /**
      * Creates an instance with the given params.
      *
      * @param firstOffset            The first offset in the message set 
unless the message format is less than V2 and we are appending
-     *                               to the follower. If the message is a 
duplicate message the segment base offset and relative position
-     *                               in segment will be unknown.
+     *                               to the follower.
      * @param lastOffset             The last offset in the message set
      * @param lastLeaderEpoch        The partition leader epoch corresponding 
to the last offset, if available.
      * @param maxTimestamp           The maximum timestamp of the message set.
@@ -69,13 +62,10 @@ public class LogAppendInfo {
      * @param logStartOffset         The start offset of the log at the time 
of this append.
      * @param recordConversionStats  Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
      * @param sourceCompression      The source codec used in the message set 
(send by the producer)
-     * @param targetCompression      The target codec of the message set(after 
applying the broker compression configuration if any)
-     * @param shallowCount           The number of shallow messages
      * @param validBytes             The number of valid bytes
-     * @param offsetsMonotonic       Are the offsets in this message set 
monotonically increasing
      * @param lastOffsetOfFirstBatch The last offset of the first batch
      */
-    public LogAppendInfo(Optional<LogOffsetMetadata> firstOffset,
+    public LogAppendInfo(long firstOffset,
                          long lastOffset,
                          OptionalInt lastLeaderEpoch,
                          long maxTimestamp,
@@ -84,22 +74,18 @@ public class LogAppendInfo {
                          long logStartOffset,
                          RecordConversionStats recordConversionStats,
                          CompressionType sourceCompression,
-                         CompressionType targetCompression,
-                         int shallowCount,
                          int validBytes,
-                         boolean offsetsMonotonic,
                          long lastOffsetOfFirstBatch) {
         this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, 
offsetOfMaxTimestamp, logAppendTime, logStartOffset,
-                recordConversionStats, sourceCompression, targetCompression, 
shallowCount, validBytes, offsetsMonotonic,
-                lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(), 
null, LeaderHwChange.NONE);
+                recordConversionStats, sourceCompression, validBytes, 
lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(),
+                LeaderHwChange.NONE);
     }
 
     /**
      * Creates an instance with the given params.
      *
      * @param firstOffset            The first offset in the message set 
unless the message format is less than V2 and we are appending
-     *                               to the follower. If the message is a 
duplicate message the segment base offset and relative position
-     *                               in segment will be unknown.
+     *                               to the follower.
      * @param lastOffset             The last offset in the message set
      * @param lastLeaderEpoch        The partition leader epoch corresponding 
to the last offset, if available.
      * @param maxTimestamp           The maximum timestamp of the message set.
@@ -108,17 +94,13 @@ public class LogAppendInfo {
      * @param logStartOffset         The start offset of the log at the time 
of this append.
      * @param recordConversionStats  Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
      * @param sourceCompression      The source codec used in the message set 
(send by the producer)
-     * @param targetCompression      The target codec of the message set(after 
applying the broker compression configuration if any)
-     * @param shallowCount           The number of shallow messages
      * @param validBytes             The number of valid bytes
-     * @param offsetsMonotonic       Are the offsets in this message set 
monotonically increasing
      * @param lastOffsetOfFirstBatch The last offset of the first batch
-     * @param errorMessage           error message
      * @param recordErrors           List of record errors that caused the 
respective batch to be dropped
      * @param leaderHwChange         Incremental if the high watermark needs 
to be increased after appending record
      *                               Same if high watermark is not changed. 
None is the default value and it means append failed
      */
-    public LogAppendInfo(Optional<LogOffsetMetadata> firstOffset,
+    public LogAppendInfo(long firstOffset,
                          long lastOffset,
                          OptionalInt lastLeaderEpoch,
                          long maxTimestamp,
@@ -127,13 +109,9 @@ public class LogAppendInfo {
                          long logStartOffset,
                          RecordConversionStats recordConversionStats,
                          CompressionType sourceCompression,
-                         CompressionType targetCompression,
-                         int shallowCount,
                          int validBytes,
-                         boolean offsetsMonotonic,
                          long lastOffsetOfFirstBatch,
                          List<RecordError> recordErrors,
-                         String errorMessage,
                          LeaderHwChange leaderHwChange) {
         this.firstOffset = firstOffset;
         this.lastOffset = lastOffset;
@@ -144,21 +122,17 @@ public class LogAppendInfo {
         this.logStartOffset = logStartOffset;
         this.recordConversionStats = recordConversionStats;
         this.sourceCompression = sourceCompression;
-        this.targetCompression = targetCompression;
-        this.shallowCount = shallowCount;
         this.validBytes = validBytes;
-        this.offsetsMonotonic = offsetsMonotonic;
         this.lastOffsetOfFirstBatch = lastOffsetOfFirstBatch;
         this.recordErrors = recordErrors;
-        this.errorMessage = errorMessage;
         this.leaderHwChange = leaderHwChange;
     }
 
-    public Optional<LogOffsetMetadata> firstOffset() {
+    public long firstOffset() {
         return firstOffset;
     }
 
-    public void setFirstOffset(Optional<LogOffsetMetadata> firstOffset) {
+    public void setFirstOffset(long firstOffset) {
         this.firstOffset = firstOffset;
     }
 
@@ -218,30 +192,14 @@ public class LogAppendInfo {
         return sourceCompression;
     }
 
-    public CompressionType targetCompression() {
-        return targetCompression;
-    }
-
-    public int shallowCount() {
-        return shallowCount;
-    }
-
     public int validBytes() {
         return validBytes;
     }
 
-    public boolean offsetsMonotonic() {
-        return offsetsMonotonic;
-    }
-
     public List<RecordError> recordErrors() {
         return recordErrors;
     }
 
-    public String errorMessage() {
-        return errorMessage;
-    }
-
     public LeaderHwChange leaderHwChange() {
         return leaderHwChange;
     }
@@ -253,7 +211,7 @@ public class LogAppendInfo {
      * offset to avoid decompressing the data.
      */
     public long firstOrLastOffsetOfFirstBatch() {
-        return firstOffset.map(x -> 
x.messageOffset).orElse(lastOffsetOfFirstBatch);
+        return firstOffset >= 0 ? firstOffset : lastOffsetOfFirstBatch;
     }
 
     /**
@@ -262,8 +220,8 @@ public class LogAppendInfo {
      * @return Maximum possible number of messages described by LogAppendInfo
      */
     public long numMessages() {
-        if (firstOffset.isPresent() && firstOffset.get().messageOffset >= 0 && 
lastOffset >= 0) {
-            return lastOffset - firstOffset.get().messageOffset + 1;
+        if (firstOffset >= 0 && lastOffset >= 0) {
+            return lastOffset - firstOffset + 1;
         }
         return 0;
     }
@@ -276,24 +234,22 @@ public class LogAppendInfo {
      */
     public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
         return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, 
maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, 
recordConversionStats,
-                sourceCompression, targetCompression, shallowCount, 
validBytes, offsetsMonotonic, lastOffsetOfFirstBatch, recordErrors, 
errorMessage, newLeaderHwChange);
+                sourceCompression, validBytes, lastOffsetOfFirstBatch, 
recordErrors, newLeaderHwChange);
     }
 
     public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long 
logStartOffset) {
-        return new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-                RecordConversionStats.EMPTY, CompressionType.NONE, 
CompressionType.NONE, -1, -1,
-                false, -1L);
+        return new LogAppendInfo(-1, -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+                RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L);
     }
 
     /**
      * In ProduceResponse V8+, we add two new fields record_errors and 
error_message (see KIP-467).
      * For any record failures with InvalidTimestamp or 
InvalidRecordException, we construct a LogAppendInfo object like the one
-     * in unknownLogAppendInfoWithLogStartOffset, but with additional fields 
recordErrors and errorMessage
+     * in unknownLogAppendInfoWithLogStartOffset, but with additional fields 
recordErrors
      */
-    public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long 
logStartOffset, List<RecordError> recordErrors, String errorMessage) {
-        return new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-                RecordConversionStats.EMPTY, CompressionType.NONE, 
CompressionType.NONE, -1, -1,
-                false, -1L, recordErrors, errorMessage, LeaderHwChange.NONE);
+    public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long 
logStartOffset, List<RecordError> recordErrors) {
+        return new LogAppendInfo(-1, -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+                RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L, 
recordErrors, LeaderHwChange.NONE);
     }
 
     @Override
@@ -308,13 +264,9 @@ public class LogAppendInfo {
                 ", logStartOffset=" + logStartOffset +
                 ", recordConversionStats=" + recordConversionStats +
                 ", sourceCompression=" + sourceCompression +
-                ", targetCompression=" + targetCompression +
-                ", shallowCount=" + shallowCount +
                 ", validBytes=" + validBytes +
-                ", offsetsMonotonic=" + offsetsMonotonic +
                 ", lastOffsetOfFirstBatch=" + lastOffsetOfFirstBatch +
                 ", recordErrors=" + recordErrors +
-                ", errorMessage='" + errorMessage + '\'' +
                 ", leaderHwChange=" + leaderHwChange +
                 ')';
     }

Reply via email to