This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 221d667  KAFKA-9196; Update high watermark metadata after segment roll 
(#7695)
221d667 is described below

commit 221d66774ced8f3d9360bcd138ff3a8553cf1919
Author: Jason Gustafson <[email protected]>
AuthorDate: Sun Nov 17 14:42:00 2019 -0800

    KAFKA-9196; Update high watermark metadata after segment roll (#7695)
    
    When we roll a new segment, the log offset metadata tied to the high 
watermark may
    need to be updated. This is needed when the high watermark is equal to the 
log end
    offset at the time of the roll. Otherwise, we risk exposing uncommitted 
data early.
    
    Reviewers: Dhruvil Shah <[email protected]>, Ismael Juma 
<[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala          |  8 ++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala | 36 ++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index bf0486a..6514aa2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -744,9 +744,9 @@ class Log(@volatile var dir: File,
   private def updateLogEndOffset(messageOffset: Long): Unit = {
     nextOffsetMetadata = LogOffsetMetadata(messageOffset, 
activeSegment.baseOffset, activeSegment.size)
 
-    // Update the high watermark in case it has gotten ahead of the log end 
offset
-    // following a truncation.
-    if (highWatermark > messageOffset) {
+    // Update the high watermark in case it has gotten ahead of the log end 
offset following a truncation
+    // or if a new segment has been rolled and the offset metadata needs to be 
updated.
+    if (highWatermark >= messageOffset) {
       updateHighWatermarkMetadata(nextOffsetMetadata)
     }
   }
@@ -1912,9 +1912,11 @@ class Log(@volatile var dir: File,
           initFileSize = initFileSize,
           preallocate = config.preallocate)
         addSegment(segment)
+
         // We need to update the segment base offset and append position data 
of the metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(nextOffsetMetadata.messageOffset)
+
         // schedule an asynchronous flush of the old segment
         scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9a69e00..a948cf3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -78,6 +78,42 @@ class LogTest {
   }
 
   @Test
+  def testHighWatermarkMetadataUpdatedAfterSegmentRoll(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    def assertFetchSizeAndOffsets(fetchOffset: Long,
+                                  expectedSize: Int,
+                                  expectedOffsets: Seq[Long]): Unit = {
+      val readInfo = log.read(
+        startOffset = fetchOffset,
+        maxLength = 2048,
+        isolation = FetchHighWatermark,
+        minOneMessage = false)
+      assertEquals(expectedSize, readInfo.records.sizeInBytes)
+      assertEquals(expectedOffsets, 
readInfo.records.records.asScala.map(_.offset))
+    }
+
+    val records = TestUtils.records(List(
+      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
+      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
+      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
+    ))
+
+    log.appendAsLeader(records, leaderEpoch = 0)
+    assertFetchSizeAndOffsets(fetchOffset = 0L, 0, Seq())
+
+    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+    assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1, 
2))
+
+    log.roll()
+    assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1, 
2))
+
+    log.appendAsLeader(records, leaderEpoch = 0)
+    assertFetchSizeAndOffsets(fetchOffset = 3L, 0, Seq())
+  }
+
+  @Test
   def testHighWatermarkMaintenance(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)

Reply via email to