[ 
https://issues.apache.org/jira/browse/KAFKA-6530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403558#comment-16403558
 ] 

ASF GitHub Bot commented on KAFKA-6530:
---------------------------------------

hachikuji closed pull request #4660: KAFKA-6530: Use actual first offset of 
message set when rolling log segment
URL: https://github.com/apache/kafka/pull/4660
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 932d4b69c22..da6b68c0bc6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -510,6 +510,10 @@ public static MemoryRecords withRecords(long 
initialOffset, CompressionType comp
                 records);
     }
 
+    public static MemoryRecords withRecords(byte magic, long initialOffset, 
CompressionType compressionType, SimpleRecord... records) {
+        return withRecords(magic, initialOffset, compressionType, 
TimestampType.CREATE_TIME, records);
+    }
+
     public static MemoryRecords withRecords(long initialOffset, 
CompressionType compressionType, Integer partitionLeaderEpoch, SimpleRecord... 
records) {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, 
compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
                 RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 
partitionLeaderEpoch, false, records);
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index f0050f54aef..cc693375079 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -47,11 +47,11 @@ import java.lang.{Long => JLong}
 import java.util.regex.Pattern
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, 
-1L, RecordBatch.NO_TIMESTAMP, -1L,
+  val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, 
-1L, RecordBatch.NO_TIMESTAMP, -1L,
     RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, 
-1, offsetsMonotonic = false)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): 
LogAppendInfo =
-    LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, 
RecordBatch.NO_TIMESTAMP, logStartOffset,
+    LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, 
RecordBatch.NO_TIMESTAMP, logStartOffset,
       RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, 
-1, -1, offsetsMonotonic = false)
 }
 
@@ -59,7 +59,7 @@ object LogAppendInfo {
  * Struct to hold various quantities we compute about each message set before 
appending to the log
  *
  * @param firstOffset The first offset in the message set unless the message 
format is less than V2 and we are appending
- *                    to the follower. In that case, this will be the last 
offset for performance reasons.
+ *                    to the follower.
  * @param lastOffset The last offset in the message set
  * @param maxTimestamp The maximum timestamp of the message set.
  * @param offsetOfMaxTimestamp The offset of the message with the maximum 
timestamp.
@@ -72,7 +72,7 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  */
-case class LogAppendInfo(var firstOffset: Long,
+case class LogAppendInfo(var firstOffset: Option[Long],
                          var lastOffset: Long,
                          var maxTimestamp: Long,
                          var offsetOfMaxTimestamp: Long,
@@ -83,7 +83,24 @@ case class LogAppendInfo(var firstOffset: Long,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean)
+                         offsetsMonotonic: Boolean) {
+  /**
+    * Get the first offset if it exists, else get the last offset.
+    * @return The offset of first message if it exists; else offset of the 
last message.
+    */
+  def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset)
+
+  /**
+    * Get the (maximum) number of messages described by LogAppendInfo
+    * @return Maximum possible number of messages described by LogAppendInfo
+    */
+  def numMessages: Long = {
+    firstOffset match {
+      case Some(firstOffsetVal) if (firstOffsetVal >= 0 && lastOffset >= 0) => 
(lastOffset - firstOffsetVal + 1)
+      case _ => 0
+    }
+  }
+}
 
 /**
  * A class used to hold useful metadata about a completed transaction. This is 
used to build
@@ -653,7 +670,7 @@ class Log(@volatile var dir: File,
         if (assignOffsets) {
           // assign offsets to the message set
           val offset = new LongRef(nextOffsetMetadata.messageOffset)
-          appendInfo.firstOffset = offset.value
+          appendInfo.firstOffset = Some(offset.value)
           val now = time.milliseconds
           val validateAndOffsetAssignResult = try {
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
@@ -695,7 +712,7 @@ class Log(@volatile var dir: File,
           }
         } else {
           // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < 
nextOffsetMetadata.messageOffset)
+          if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < 
nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in 
" + records.records.asScala.map(_.offset))
         }
 
@@ -715,7 +732,7 @@ class Log(@volatile var dir: File,
         // validate the idempotent/transactional state of the producers and 
collect some metadata
         val (updatedProducers, completedTxns, maybeDuplicate) = 
analyzeAndValidateProducerState(validRecords, isFromClient)
         maybeDuplicate.foreach { duplicate =>
-          appendInfo.firstOffset = duplicate.firstOffset
+          appendInfo.firstOffset = Some(duplicate.firstOffset)
           appendInfo.lastOffset = duplicate.lastOffset
           appendInfo.logAppendTime = duplicate.timestamp
           appendInfo.logStartOffset = logStartOffset
@@ -723,17 +740,14 @@ class Log(@volatile var dir: File,
         }
 
         // maybe roll the log if this segment is full
-        val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
-          maxTimestampInMessages = appendInfo.maxTimestamp,
-          maxOffsetInMessages = appendInfo.lastOffset)
+        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
         val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOffset,
+          messageOffset = appendInfo.firstOrLastOffset,
           segmentBaseOffset = segment.baseOffset,
           relativePositionInSegment = segment.size)
 
-        segment.append(firstOffset = appendInfo.firstOffset,
-          largestOffset = appendInfo.lastOffset,
+        segment.append(largestOffset = appendInfo.lastOffset,
           largestTimestamp = appendInfo.maxTimestamp,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
@@ -761,8 +775,8 @@ class Log(@volatile var dir: File,
         // update the first unstable offset (which is used to compute LSO)
         updateFirstUnstableOffset()
 
-        trace("Appended message set to log %s with first offset: %d, next 
offset: %d, and messages: %s"
-          .format(this.name, appendInfo.firstOffset, 
nextOffsetMetadata.messageOffset, validRecords))
+        trace(s"Appended message set to log ${this.name} with last offset: 
${appendInfo.lastOffset}, " +
+              s"first offset: ${appendInfo.firstOffset}, next offset: 
${nextOffsetMetadata.messageOffset}, and messages: $validRecords")
 
         if (unflushedMessages >= config.flushInterval)
           flush()
@@ -859,12 +873,13 @@ class Log(@volatile var dir: File,
   private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: 
Boolean): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
-    var firstOffset = -1L
+    var firstOffset: Option[Long] = None
     var lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
+    var readFirstMessage = false
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility 
issues with older clients
@@ -876,8 +891,12 @@ class Log(@volatile var dir: File,
       // For magic version 2, we can get the first offset directly from the 
batch header.
       // When appending to the leader, we will update LogAppendInfo.baseOffset 
with the correct value. In the follower
       // case, validation will be more lenient.
-      if (firstOffset < 0)
-        firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) 
batch.baseOffset else batch.lastOffset
+      // Also indicate whether we have the accurate first offset or not
+      if (!readFirstMessage) {
+        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          firstOffset = Some(batch.baseOffset)
+        readFirstMessage = true
+      }
 
       // check that offsets are monotonically increasing
       if (lastOffset >= batch.lastOffset)
@@ -1268,8 +1287,8 @@ class Log(@volatile var dir: File,
   /**
    * Roll the log over to a new empty log segment if necessary.
    *
-   * @param messagesSize The messages set size in bytes
-   * @param maxTimestampInMessages The maximum timestamp in the messages.
+   * @param messagesSize The messages set size in bytes.
+   * @param appendInfo log append information
    * logSegment will be rolled if one of the following conditions met
    * <ol>
    * <li> The logSegment is full
@@ -1279,14 +1298,19 @@ class Log(@volatile var dir: File,
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new 
segment
    */
-  private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, 
maxOffsetInMessages: Long): LogSegment = {
+  private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): 
LogSegment = {
     val segment = activeSegment
     val now = time.milliseconds
+
+    val maxTimestampInMessages = appendInfo.maxTimestamp
+    val maxOffsetInMessages = appendInfo.lastOffset
+
     if (segment.shouldRoll(messagesSize, maxTimestampInMessages, 
maxOffsetInMessages, now)) {
       debug(s"Rolling new log segment in $name (log_size = 
${segment.size}/${config.segmentSize}}, " +
           s"offset_index_size = 
${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
           s"time_index_size = 
${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
           s"inactive_time_ms = ${segment.timeWaitedForRoll(now, 
maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
+
       /*
         maxOffsetInMessages - Integer.MAX_VALUE is a heuristic value for the 
first offset in the set of messages.
         Since the offset in messages will not differ by more than 
Integer.MAX_VALUE, this is guaranteed <= the real
@@ -1296,8 +1320,13 @@ class Log(@volatile var dir: File,
         Integer.MAX_VALUE.toLong + 2 or more.  In this case, the prior 
behavior would roll a new log segment whose
         base offset was too low to contain the next message.  This edge case 
is possible when a replica is recovering a
         highly compacted topic from scratch.
-       */
-      roll(maxOffsetInMessages - Integer.MAX_VALUE)
+        Note that this is only required for pre-V2 message formats because 
these do not store the first message offset
+        in the header.
+      */
+      appendInfo.firstOffset match {
+        case Some(firstOffset) => roll(firstOffset)
+        case None => roll(maxOffsetInMessages - Integer.MAX_VALUE)
+      }
     } else {
       segment
     }
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 07e8440f26f..0ee994252a1 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -614,8 +614,7 @@ private[log] class Cleaner(val id: Int,
         val retained = MemoryRecords.readableRecords(outputBuffer)
         // it's OK not to hold the Log's lock in this case, because this 
segment is only accessed by other threads
         // after `Log.replaceSegments` (which acquires the lock) is called
-        dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
-          largestOffset = result.maxOffset,
+        dest.append(largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
           records = retained)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 5970f42f6d9..5130b28b597 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -112,7 +112,6 @@ class LogSegment private[log] (val log: FileRecords,
    *
    * It is assumed this method is being called from within a lock.
    *
-   * @param firstOffset The first offset in the message set.
    * @param largestOffset The last offset in the message set
    * @param largestTimestamp The largest timestamp in the message set.
    * @param shallowOffsetOfMaxTimestamp The offset of the message that has the 
largest timestamp in the messages to append.
@@ -120,21 +119,20 @@ class LogSegment private[log] (val log: FileRecords,
    * @return the physical position in the file of the appended records
    */
   @nonthreadsafe
-  def append(firstOffset: Long,
-             largestOffset: Long,
+  def append(largestOffset: Long,
              largestTimestamp: Long,
              shallowOffsetOfMaxTimestamp: Long,
              records: MemoryRecords): Unit = {
     if (records.sizeInBytes > 0) {
-      trace("Inserting %d bytes at offset %d at position %d with largest 
timestamp %d at shallow offset %d"
-          .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), 
largestTimestamp, shallowOffsetOfMaxTimestamp))
+      trace(s"Inserting ${records.sizeInBytes} bytes at end offset 
$largestOffset at position ${log.sizeInBytes} " +
+            s"with largest timestamp $largestTimestamp at shallow offset 
$shallowOffsetOfMaxTimestamp")
       val physicalPosition = log.sizeInBytes()
       if (physicalPosition == 0)
         rollingBasedTimestamp = Some(largestTimestamp)
       // append the messages
       require(canConvertToRelativeOffset(largestOffset), "largest offset in 
message set can not be safely converted to relative offset.")
       val appendedBytes = log.append(records)
-      trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
+      trace(s"Appended $appendedBytes to ${log.file()} at end offset 
$largestOffset")
       // Update the in memory max timestamp and corresponding offset.
       if (largestTimestamp > maxTimestampSoFar) {
         maxTimestampSoFar = largestTimestamp
@@ -142,7 +140,7 @@ class LogSegment private[log] (val log: FileRecords,
       }
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        offsetIndex.append(firstOffset, physicalPosition)
+        offsetIndex.append(largestOffset, physicalPosition)
         timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
         bytesSinceLastIndexEntry = 0
       }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 470842e9d9e..d4abc11be1b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -474,7 +474,7 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.error, result.info.firstOffset, 
result.info.logAppendTime, result.info.logStartOffset)) // response status
+                  new PartitionResponse(result.error, 
result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, 
result.info.logStartOffset)) // response status
       }
 
       
processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
@@ -502,7 +502,7 @@ class ReplicaManager(val config: KafkaConfig,
       // Just return an error and don't handle the request at all
       val responseStatus = entriesPerPartition.map { case (topicPartition, _) 
=>
         topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
-          LogAppendInfo.UnknownLogAppendInfo.firstOffset, 
RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), 
RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
       }
       responseCallback(responseStatus)
     }
@@ -747,11 +747,7 @@ class ReplicaManager(val config: KafkaConfig,
               .format(topicPartition, localBrokerId))
           }
 
-          val numAppendedMessages =
-            if (info.firstOffset == -1L || info.lastOffset == -1L)
-              0
-            else
-              info.lastOffset - info.firstOffset + 1
+          val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as 
bytesInRate and messageInRate
           
brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala 
b/core/src/test/scala/other/kafka/StressTestLog.scala
index 1710da7b9c3..54f8582c4c0 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -68,47 +68,76 @@ object StressTestLog {
     })
 
     while(running.get) {
-      println("Reader offset = %d, writer offset = %d".format(reader.offset, 
writer.offset))
       Thread.sleep(1000)
+      println("Reader offset = %d, writer offset = 
%d".format(reader.currentOffset, writer.currentOffset))
+      writer.checkProgress()
+      reader.checkProgress()
     }
   }
 
   abstract class WorkerThread extends Thread {
+    val threadInfo = "Thread: " + Thread.currentThread.getName + " Class: " + 
getClass.getName
+
     override def run() {
       try {
         while(running.get)
           work()
       } catch {
-        case e: Exception =>
+        case e: Exception => {
           e.printStackTrace()
-          running.set(false)
+        }
+      } finally {
+        running.set(false)
       }
-      println(getClass.getName + " exiting...")
     }
+
     def work()
+    def isMakingProgress(): Boolean
+  }
+
+  trait LogProgress {
+    @volatile var currentOffset = 0
+    private var lastOffsetCheckpointed = currentOffset
+    private var lastProgressCheckTime = System.currentTimeMillis
+
+    def isMakingProgress(): Boolean = {
+      if (currentOffset > lastOffsetCheckpointed) {
+        lastOffsetCheckpointed = currentOffset
+        return true
+      }
+
+      false
+    }
+
+    def checkProgress() {
+      // Check if we are making progress every 500ms
+      val curTime = System.currentTimeMillis
+      if ((curTime - lastProgressCheckTime) > 500) {
+        require(isMakingProgress(), "Thread not making progress")
+        lastProgressCheckTime = curTime
+      }
+    }
   }
 
-  class WriterThread(val log: Log) extends WorkerThread {
-    @volatile var offset = 0
+  class WriterThread(val log: Log) extends WorkerThread with LogProgress {
     override def work() {
-      val logAppendInfo = 
log.appendAsFollower(TestUtils.singletonRecords(offset.toString.getBytes))
-      require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset 
== offset)
-      offset += 1
-      if(offset % 1000 == 0)
-        Thread.sleep(500)
+      val logAppendInfo = 
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 
0)
+      require(logAppendInfo.firstOffset.forall(_ == currentOffset) && 
logAppendInfo.lastOffset == currentOffset)
+      currentOffset += 1
+      if (currentOffset % 1000 == 0)
+        Thread.sleep(50)
     }
   }
 
-  class ReaderThread(val log: Log) extends WorkerThread {
-    @volatile var offset = 0
+  class ReaderThread(val log: Log) extends WorkerThread with LogProgress {
     override def work() {
       try {
-        log.read(offset, 1024, Some(offset+1), isolationLevel = 
IsolationLevel.READ_UNCOMMITTED).records match {
+        log.read(currentOffset, 1024, Some(currentOffset + 1), isolationLevel 
= IsolationLevel.READ_UNCOMMITTED).records match {
           case read: FileRecords if read.sizeInBytes > 0 => {
             val first = read.batches.iterator.next()
-            require(first.lastOffset == offset, "We should either read nothing 
or the message we asked for.")
+            require(first.lastOffset == currentOffset, "We should either read 
nothing or the message we asked for.")
             require(first.sizeInBytes == read.sizeInBytes, "Expected %d but 
got %d.".format(first.sizeInBytes, read.sizeInBytes))
-            offset += 1
+            currentOffset += 1
           }
           case _ =>
         }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 152d6d37237..4f5ba5caef0 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -69,7 +69,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
     checkLogAfterAppendingDups(log, startSize, appends)
 
     val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-    val largeMessageOffset = appendInfo.firstOffset
+    val largeMessageOffset = appendInfo.firstOffset.get
 
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, 
numDups = 3, log = log, codec = codec)
     val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, 
largeMessageOffset)) ++ dups
@@ -176,7 +176,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
     val appends2: Seq[(Int, String, Long)] = {
       val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = 
codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
       val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-      val largeMessageOffset = appendInfo.firstOffset
+      val largeMessageOffset = appendInfo.firstOffset.get
 
       // also add some messages with version 1 and version 2 to check that we 
handle mixed format versions correctly
       props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
@@ -314,7 +314,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
       val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = 
value.toString.getBytes, codec = codec,
               key = key.toString.getBytes, magicValue = magicValue), 
leaderEpoch = 0)
       counter += 1
-      (key, value, appendInfo.firstOffset)
+      (key, value, appendInfo.firstOffset.get)
     }
   }
 
@@ -331,7 +331,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
     }
 
     val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, 
codec, records: _*), leaderEpoch = 0)
-    val offsets = appendInfo.firstOffset to appendInfo.lastOffset
+    val offsets = appendInfo.firstOffset.get to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c12f617ddaa..906c26da4cf 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -147,17 +147,17 @@ class LogCleanerTest extends JUnitSuite {
 
     // check duplicate append from producer 1
     var logAppendInfo = appendIdempotentAsLeader(log, pid1, 
producerEpoch)(Seq(1, 2, 3))
-    assertEquals(0L, logAppendInfo.firstOffset)
+    assertEquals(0L, logAppendInfo.firstOffset.get)
     assertEquals(2L, logAppendInfo.lastOffset)
 
     // check duplicate append from producer 3
     logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 
4))
-    assertEquals(6L, logAppendInfo.firstOffset)
+    assertEquals(6L, logAppendInfo.firstOffset.get)
     assertEquals(7L, logAppendInfo.lastOffset)
 
     // check duplicate append from producer 2
     logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 
1, 4))
-    assertEquals(3L, logAppendInfo.firstOffset)
+    assertEquals(3L, logAppendInfo.firstOffset.get)
     assertEquals(5L, logAppendInfo.lastOffset)
 
     // do one more append and a round of cleaning to force another deletion 
from producer 1's batch
@@ -173,7 +173,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // duplicate append from producer1 should still be fine
     logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 
2, 3))
-    assertEquals(0L, logAppendInfo.firstOffset)
+    assertEquals(0L, logAppendInfo.firstOffset.get)
     assertEquals(2L, logAppendInfo.lastOffset)
   }
 
@@ -1082,16 +1082,17 @@ class LogCleanerTest extends JUnitSuite {
     val logConfig = LogConfig(logProps)
     val log = makeLog(config = logConfig)
     val cleaner = makeCleaner(Int.MaxValue)
-    val start = 0
-    val end = 2
-    val offsetSeq = Seq(0L, 7206178L)
-    writeToLog(log, (start until end) zip (start until end), offsetSeq)
-    cleaner.buildOffsetMap(log, start, end, map, new CleanerStats())
-    val endOffset = map.latestOffset
-    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
-    assertEquals("Should have the expected number of messages in the map.", 
end - start, map.size)
+    val keyStart = 0
+    val keyEnd = 2
+    val offsetStart = 0L
+    val offsetEnd = 7206178L
+    val offsetSeq = Seq(offsetStart, offsetEnd)
+    writeToLog(log, (keyStart until keyEnd) zip (keyStart until keyEnd), 
offsetSeq)
+    cleaner.buildOffsetMap(log, keyStart, offsetEnd + 1L, map, new 
CleanerStats())
+    assertEquals("Last offset should be the end offset.", offsetEnd, 
map.latestOffset)
+    assertEquals("Should have the expected number of messages in the map.", 
keyEnd - keyStart, map.size)
     assertEquals("Map should contain first value", 0L, map.get(key(0)))
-    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+    assertEquals("Map should contain second value", offsetEnd, map.get(key(1)))
   }
 
   /**
@@ -1265,7 +1266,7 @@ class LogCleanerTest extends JUnitSuite {
                 checkDone = checkDone)
 
   private def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] 
= {
-    for ((key, value) <- seq) yield log.appendAsLeader(record(key, value), 
leaderEpoch = 0).firstOffset
+    for ((key, value) <- seq) yield log.appendAsLeader(record(key, value), 
leaderEpoch = 0).firstOffset.get
   }
 
   private def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8a049147793..2fdda6b8827 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -142,7 +142,7 @@ class LogManagerTest {
     for (_ <- 0 until numMessages) {
       val set = TestUtils.singletonRecords("test".getBytes())
       val info = log.appendAsLeader(set, leaderEpoch = 0)
-      offset = info.firstOffset
+      offset = info.firstOffset.get
     }
 
     log.onHighWatermarkIncremented(log.logEndOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c45ed0d2986..31bcf9c2a3b 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -85,7 +85,7 @@ class LogSegmentTest {
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there", "little", "bee")
-    seg.append(50, 53, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = 
None).records
     checkEquals(ms.records.iterator, read.records.iterator)
   }
@@ -99,7 +99,7 @@ class LogSegmentTest {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
     val ms = records(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, 52, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, ms)
     def validate(offset: Long) =
       assertEquals(ms.records.asScala.filter(_.offset == offset).toList,
                    seg.read(startOffset = offset, maxSize = 1024, maxOffset = 
Some(offset+1)).records.records.asScala.toList)
@@ -115,7 +115,7 @@ class LogSegmentTest {
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", 
read)
   }
@@ -128,9 +128,9 @@ class LogSegmentTest {
   def testReadFromGap() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
@@ -145,9 +145,9 @@ class LogSegmentTest {
     var offset = 40
     for (_ <- 0 until 30) {
       val ms1 = records(offset, "hello")
-      seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
+      seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
       val ms2 = records(offset + 1, "hello")
-      seg.append(offset + 1, offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+      seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
       assertEquals(List(ms1.records.iterator.next(), 
ms2.records.iterator.next()), read.records.records.asScala.toList)
@@ -207,7 +207,7 @@ class LogSegmentTest {
     val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
     var offset = 40
     for (_ <- 0 until numMessages) {
-      seg.append(offset, offset, offset, offset, records(offset, "hello"))
+      seg.append(offset, offset, offset, records(offset, "hello"))
       offset += 1
     }
     assertEquals(offset, seg.readNextOffset)
@@ -229,7 +229,7 @@ class LogSegmentTest {
     // test the case where we fully truncate the log
     val time = new MockTime
     val seg = createSegment(40, time = time)
-    seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
+    seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
 
     // If the segment is empty after truncation, the create time should be 
reset
     time.sleep(500)
@@ -241,7 +241,7 @@ class LogSegmentTest {
     assertFalse(seg.offsetIndex.isFull)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
 
-    seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
+    seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
   }
 
   /**
@@ -253,7 +253,7 @@ class LogSegmentTest {
     val seg = createSegment(40, messageSize * 2 - 1)
     // Produce some messages
     for (i <- 40 until 50)
-      seg.append(i, i, i * 10, i, records(i, s"msg$i"))
+      seg.append(i, i * 10, i, records(i, s"msg$i"))
 
     assertEquals(490, seg.largestTimestamp)
     // Search for an indexed timestamp
@@ -277,7 +277,7 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.readNextOffset)
-    seg.append(50, 52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", 
"there", "you"))
+    seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", 
"there", "you"))
     assertEquals(53, seg.readNextOffset)
   }
 
@@ -304,7 +304,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
+      seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.offsetIndex.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(new ProducerStateManager(topicPartition, logDir))
@@ -323,26 +323,26 @@ class LogSegmentTest {
     val pid2 = 10L
 
     // append transactional records from pid1
-    segment.append(firstOffset = 100L, largestOffset = 101L, largestTimestamp 
= RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 101L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 100L, 
MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
         pid1, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
     // append transactional records from pid2
-    segment.append(firstOffset = 102L, largestOffset = 103L, largestTimestamp 
= RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 103L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 102L, 
MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
         pid2, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
     // append non-transactional records
-    segment.append(firstOffset = 104L, largestOffset = 105L, largestTimestamp 
= RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 105L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, 
CompressionType.NONE,
         partitionLeaderEpoch, new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)))
 
     // abort the transaction from pid2 (note LSO should be 100L since the txn 
from pid1 has not completed)
-    segment.append(firstOffset = 106L, largestOffset = 106L, largestTimestamp 
= RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 106L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 106L, 
endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
 
     // commit the transaction from pid1
-    segment.append(firstOffset = 107L, largestOffset = 107L, largestTimestamp 
= RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 107L, largestTimestamp = 
RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 107L, 
endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
 
     var stateManager = new ProducerStateManager(topicPartition, logDir)
@@ -393,7 +393,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptTimeIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i, i * 10, i, records(i, i.toString))
+      seg.append(i, i * 10, i, records(i, i.toString))
     val timeIndexFile = seg.timeIndex.file
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
     seg.recover(new ProducerStateManager(topicPartition, logDir))
@@ -413,7 +413,7 @@ class LogSegmentTest {
     for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for (i <- 0 until messagesAppended)
-        seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
+        seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the 
way to the end
 
@@ -445,9 +445,9 @@ class LogSegmentTest {
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
@@ -466,9 +466,9 @@ class LogSegmentTest {
       initFileSize = 512 * 1024 * 1024, preallocate = true)
 
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
     val oldSize = seg.log.sizeInBytes()
@@ -504,9 +504,9 @@ class LogSegmentTest {
 
     //Given two messages with a gap between them (e.g. mid offset compacted 
away)
     val ms1 = records(offset, "first message")
-    seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
+    seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
     val ms2 = records(offset + 3, "message after gap")
-    seg.append(offset + 3, offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
 
     // When we truncate to an offset without a corresponding log entry
     seg.truncateTo(offset + 1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6753939f3d8..ec748156e23 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -897,12 +897,12 @@ class LogTest {
       new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes)
     ), producerId = pid, producerEpoch = epoch, sequence = seq)
     val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 
0)
-    assertEquals("should have appended 3 entries", 
multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
+    assertEquals("should have appended 3 entries", 
multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset.get + 1, 3)
 
     // Append a Duplicate of the tail, when the entry at the tail has multiple 
records.
     val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, 
leaderEpoch = 0)
     assertEquals("Somehow appended a duplicate entry with multiple log records 
to the tail",
-      multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
+      multiEntryAppendInfo.firstOffset.get, 
dupMultiEntryAppendInfo.firstOffset.get)
     assertEquals("Somehow appended a duplicate entry with multiple log records 
to the tail",
       multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
 
@@ -945,7 +945,7 @@ class LogTest {
       producerId = pid, producerEpoch = epoch, sequence = seq)
     val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 
leaderEpoch = 0)
     val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 
leaderEpoch = 0)
-    assertEquals("Inserted a duplicate records into the log", 
origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+    assertEquals("Inserted a duplicate records into the log", 
origAppendInfo.firstOffset.get, newAppendInfo.firstOffset.get)
     assertEquals("Inserted a duplicate records into the log", 
origAppendInfo.lastOffset, newAppendInfo.lastOffset)
   }
 
@@ -1387,7 +1387,7 @@ class LogTest {
       assertEquals("Still no change in the logEndOffset", currOffset, 
log.logEndOffset)
       assertEquals("Should still be able to append and should get the 
logEndOffset assigned to the new append",
                    currOffset,
-                   log.appendAsLeader(TestUtils.singletonRecords(value = 
"hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 
0).firstOffset)
+                   log.appendAsLeader(TestUtils.singletonRecords(value = 
"hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 
0).firstOffset.get)
 
       // cleanup the log
       log.delete()
@@ -1919,17 +1919,96 @@ class LogTest {
     //Writes into an empty log with baseOffset 0
     log.appendAsFollower(set1)
     assertEquals(0L, log.activeSegment.baseOffset)
-    //This write will roll the segment, yielding a new segment with base 
offset = max(2, 1) = 2
+    //This write will roll the segment, yielding a new segment with base 
offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2
     log.appendAsFollower(set2)
-    assertEquals(2L, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, 2L).exists)
-    //This will also roll the segment, yielding a new segment with base offset 
= max(3, Integer.MAX_VALUE+3) = Integer.MAX_VALUE+3
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 
2).exists)
+    //This will go into the existing log
+    log.appendAsFollower(set3)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    //This will go into the existing log
+    log.appendAsFollower(set4)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    log.close()
+    val indexFiles = logDir.listFiles.filter(file => 
file.getName.contains(".index"))
+    assertEquals(2, indexFiles.length)
+    for (file <- indexFiles) {
+      val offsetIndex = new OffsetIndex(file, 
file.getName.replace(".index","").toLong)
+      assertTrue(offsetIndex.lastOffset >= 0)
+      offsetIndex.close()
+    }
+    Utils.delete(logDir)
+  }
+
+  @Test
+  def testOverCompactedLogRecoveryMultiRecord(): Unit = {
+    // append some messages to create some segments
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 
1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new 
SimpleRecord("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, 
CompressionType.GZIP, 0,
+      new SimpleRecord("v3".getBytes(), "k3".getBytes()),
+      new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, 
CompressionType.GZIP, 0,
+      new SimpleRecord("v5".getBytes(), "k5".getBytes()),
+      new SimpleRecord("v6".getBytes(), "k6".getBytes()))
+    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 6, 
CompressionType.GZIP, 0,
+      new SimpleRecord("v7".getBytes(), "k7".getBytes()),
+      new SimpleRecord("v8".getBytes(), "k8".getBytes()))
+    //Writes into an empty log with baseOffset 0
+    log.appendAsFollower(set1)
+    assertEquals(0L, log.activeSegment.baseOffset)
+    //This write will roll the segment, yielding a new segment with base 
offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2
+    log.appendAsFollower(set2)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 
2).exists)
+    //This will go into the existing log
+    log.appendAsFollower(set3)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    //This will go into the existing log
+    log.appendAsFollower(set4)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    log.close()
+    val indexFiles = logDir.listFiles.filter(file => 
file.getName.contains(".index"))
+    assertEquals(2, indexFiles.length)
+    for (file <- indexFiles) {
+      val offsetIndex = new OffsetIndex(file, 
file.getName.replace(".index","").toLong)
+      assertTrue(offsetIndex.lastOffset >= 0)
+      offsetIndex.close()
+    }
+    Utils.delete(logDir)
+  }
+
+  @Test
+  def testOverCompactedLogRecoveryMultiRecordV1(): Unit = {
+    // append some messages to create some segments
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 
1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    val set1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0, 
CompressionType.NONE,
+      new SimpleRecord("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 
Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP,
+      new SimpleRecord("v3".getBytes(), "k3".getBytes()),
+      new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+    val set3 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 
Integer.MAX_VALUE.toLong + 4, CompressionType.GZIP,
+      new SimpleRecord("v5".getBytes(), "k5".getBytes()),
+      new SimpleRecord("v6".getBytes(), "k6".getBytes()))
+    val set4 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 
Integer.MAX_VALUE.toLong + 6, CompressionType.GZIP,
+      new SimpleRecord("v7".getBytes(), "k7".getBytes()),
+      new SimpleRecord("v8".getBytes(), "k8".getBytes()))
+    //Writes into an empty log with baseOffset 0
+    log.appendAsFollower(set1)
+    assertEquals(0L, log.activeSegment.baseOffset)
+    //This write will roll the segment, yielding a new segment with base 
offset = max(1, 3) = 3
+    log.appendAsFollower(set2)
+    assertEquals(3, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, 3).exists)
+    //This will also roll the segment, yielding a new segment with base offset 
= max(5, Integer.MAX_VALUE+4) = Integer.MAX_VALUE+4
     log.appendAsFollower(set3)
-    assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 
3).exists)
+    assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 
4).exists)
     //This will go into the existing log
     log.appendAsFollower(set4)
-    assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
+    assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset)
     log.close()
     val indexFiles = logDir.listFiles.filter(file => 
file.getName.contains(".index"))
     assertEquals(3, indexFiles.length)
@@ -2534,7 +2613,7 @@ class LogTest {
       new SimpleRecord("baz".getBytes))
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
 
     // add more transactional records
     seq += 3
@@ -2542,14 +2621,14 @@ class LogTest {
       new SimpleRecord("blah".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
 
     // now transaction is committed
     val commitAppendInfo = 
log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
 
     // first unstable offset is not updated until the high watermark is 
advanced
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
     log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
@@ -2885,7 +2964,7 @@ class LogTest {
       new SimpleRecord("a".getBytes),
       new SimpleRecord("b".getBytes),
       new SimpleRecord("c".getBytes)), leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
 
     // mix in some non-transactional data
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
@@ -2900,7 +2979,7 @@ class LogTest {
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
 
     // now first producer's transaction is aborted
     val abortAppendInfo = 
log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch),
@@ -2908,7 +2987,7 @@ class LogTest {
     log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
 
     // LSO should now point to one less than the first offset of the second 
transaction
-    assertEquals(Some(secondAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(secondAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
 
     // commit the second transaction
     val commitAppendInfo = 
log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch),
@@ -2934,7 +3013,7 @@ class LogTest {
     val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset))
 
     // this write should spill to the second segment
@@ -2943,7 +3022,7 @@ class LogTest {
       new SimpleRecord("d".getBytes),
       new SimpleRecord("e".getBytes),
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), 
log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), 
log.firstUnstableOffset.map(_.messageOffset))
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset))
     assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use actual first offset of messages when rolling log segment for magic v2
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-6530
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6530
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Dhruvil Shah
>            Priority: Major
>
> We've implemented a heuristic to avoid overflowing when rolling a log segment 
> to determine the base offset of the next segment without decompressing the 
> message set to find the actual first offset. With the v2 message format, we 
> can find the first offset without needing decompression, so we can set the 
> correct base offset exactly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to