This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 1a4942f  KAFKA-12153; Update producer state before updating start/end 
offsets after truncation (#9838)
1a4942f is described below

commit 1a4942fd70b632e58197d2e80ce3639af37eecb1
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jan 8 17:38:40 2021 -0800

    KAFKA-12153; Update producer state before updating start/end offsets after 
truncation (#9838)
    
    When we truncate the log, the first unstable offset might become valid. On 
the other hand, the logic in `updateHighWatermarkMetadata` assumes that the 
first stable offset remains at a valid position. Since this method can be 
reached through either `updateLogStartOffset` or `updateLogEndOffset` in the 
truncation paths, we need to ensure that the first unstable offset first 
reflects the truncated state.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala            | 73 +++++++++++++---------
 .../scala/kafka/log/ProducerStateManager.scala     |  4 +-
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 58 ++++++++++++++++-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  4 +-
 5 files changed, 106 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 42aeb6e..5b4945a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -358,28 +358,28 @@ class Log(@volatile private var _dir: File,
    * @return the updated high watermark offset
    */
   def updateHighWatermark(hw: Long): Long = {
-    val newHighWatermark = if (hw < logStartOffset)
-      logStartOffset
-    else if (hw > logEndOffset)
-      logEndOffset
-    else
-      hw
-    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
-    newHighWatermark
-  }
-
-  def updateHighWatermarkOffsetMetadata(hw: LogOffsetMetadata): Long = {
-    val newHighWatermark = if (hw.messageOffset < logStartOffset) {
-      updateHighWatermarkMetadata(LogOffsetMetadata(logStartOffset))
-      logStartOffset
-    } else if (hw.messageOffset > logEndOffset) {
-      updateHighWatermarkMetadata(logEndOffsetMetadata)
-      logEndOffset
+    updateHighWatermark(LogOffsetMetadata(hw))
+  }
+
+  /**
+   * Update high watermark with offset metadata. The new high watermark will 
be lower
+   * bounded by the log start offset and upper bounded by the log end offset.
+   *
+   * @param highWatermarkMetadata the suggested high watermark with offset 
metadata
+   * @return the updated high watermark offset
+   */
+  def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
+    val endOffsetMetadata = logEndOffsetMetadata
+    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
+      LogOffsetMetadata(logStartOffset)
+    } else if (highWatermarkMetadata.messageOffset >= 
endOffsetMetadata.messageOffset) {
+      endOffsetMetadata
     } else {
-      updateHighWatermarkMetadata(hw)
-      hw.messageOffset
+      highWatermarkMetadata
     }
-    newHighWatermark
+
+    updateHighWatermarkMetadata(newHighWatermarkMetadata)
+    newHighWatermarkMetadata.messageOffset
   }
 
   /**
@@ -435,6 +435,10 @@ class Log(@volatile private var _dir: File,
       throw new IllegalArgumentException("High watermark offset should be 
non-negative")
 
     lock synchronized {
+      if (newHighWatermark.messageOffset < 
highWatermarkMetadata.messageOffset) {
+        warn(s"Non-monotonic update of high watermark from 
$highWatermarkMetadata to $newHighWatermark")
+      }
+
       highWatermarkMetadata = newHighWatermark
       
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
       maybeIncrementFirstUnstableOffset()
@@ -2125,10 +2129,12 @@ class Log(@volatile private var _dir: File,
             val deletable = logSegments.filter(segment => segment.baseOffset > 
targetOffset)
             removeAndDeleteSegments(deletable, asyncDelete = true, 
LogTruncation)
             activeSegment.truncateTo(targetOffset)
-            updateLogEndOffset(targetOffset)
-            updateLogStartOffset(math.min(targetOffset, this.logStartOffset))
             leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
-            loadProducerState(targetOffset, reloadFromCleanShutdown = false)
+
+            completeTruncation(
+              startOffset = math.min(targetOffset, logStartOffset),
+              endOffset = targetOffset
+            )
           }
           true
         }
@@ -2154,17 +2160,28 @@ class Log(@volatile private var _dir: File,
           fileAlreadyExists = false,
           initFileSize = initFileSize,
           preallocate = config.preallocate))
-        updateLogEndOffset(newOffset)
         leaderEpochCache.foreach(_.clearAndFlush())
+        producerStateManager.truncateFullyAndStartAt(newOffset)
 
-        producerStateManager.truncate()
-        producerStateManager.updateMapEndOffset(newOffset)
-        maybeIncrementFirstUnstableOffset()
-        updateLogStartOffset(newOffset)
+        completeTruncation(
+          startOffset = newOffset,
+          endOffset = newOffset
+        )
       }
     }
   }
 
+  private def completeTruncation(
+    startOffset: Long,
+    endOffset: Long
+  ): Unit = {
+    logStartOffset = startOffset
+    nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
+    recoveryPoint = math.min(recoveryPoint, endOffset)
+    rebuildProducerState(endOffset, reloadFromCleanShutdown = false, 
producerStateManager)
+    updateHighWatermark(math.min(highWatermark, endOffset))
+  }
+
   /**
    * The time this log is last known to have been fully flushed to disk
    */
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index c1d2704..5113335 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -702,13 +702,13 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   /**
    * Truncate the producer id mapping and remove all snapshots. This resets 
the state of the mapping.
    */
-  def truncate(): Unit = {
+  def truncateFullyAndStartAt(offset: Long): Unit = {
     producers.clear()
     ongoingTxns.clear()
     unreplicatedTxns.clear()
     deleteSnapshotFiles(logDir)
     lastSnapOffset = 0L
-    lastMapOffset = 0L
+    lastMapOffset = offset
   }
 
   /**
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index bbf32be..4a7aa46 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -112,7 +112,7 @@ class KafkaMetadataLog(log: Log,
 
   override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
     offsetMetadata.metadata.asScala match {
-      case Some(segmentPosition: SegmentPosition) => 
log.updateHighWatermarkOffsetMetadata(
+      case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark(
         new kafka.server.LogOffsetMetadata(
           offsetMetadata.offset,
           segmentPosition.baseOffset,
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2a16836..1582e56 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -115,6 +115,59 @@ class LogTest {
   }
 
   @Test
+  def testTruncateBelowFirstUnstableOffset(): Unit = {
+    testTruncateBelowFirstUnstableOffset(_.truncateTo)
+  }
+
+  @Test
+  def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
+    testTruncateBelowFirstUnstableOffset(_.truncateFullyAndStartAt)
+  }
+
+  private def testTruncateBelowFirstUnstableOffset(
+    truncateFunc: Log => (Long => Unit)
+  ): Unit = {
+    // Verify that truncation below the first unstable offset correctly
+    // resets the producer state. Specifically we are testing the case when
+    // the segment position of the first unstable offset is unknown.
+
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    val producerId = 17L
+    val producerEpoch: Short = 10
+    val sequence = 0
+
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("0".getBytes),
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )), leaderEpoch = 0)
+
+    log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("3".getBytes),
+      new SimpleRecord("4".getBytes)
+    ), leaderEpoch = 0)
+
+    assertEquals(Some(3L), log.firstUnstableOffset)
+
+    // We close and reopen the log to ensure that the first unstable offset 
segment
+    // position will be undefined when we truncate the log.
+    log.close()
+
+    val reopened = createLog(logDir, logConfig)
+    assertEquals(Some(LogOffsetMetadata(3L)), 
reopened.producerStateManager.firstUnstableOffset)
+
+    truncateFunc(reopened)(0L)
+    assertEquals(None, reopened.firstUnstableOffset)
+    assertEquals(Map.empty, reopened.producerStateManager.activeProducers)
+  }
+
+  @Test
   def testHighWatermarkMaintenance(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
@@ -930,8 +983,9 @@ class LogTest {
     // Truncation causes the map end offset to reset to 0
     EasyMock.expect(stateManager.mapEndOffset).andReturn(0L)
     // We skip directly to updating the map end offset
-    stateManager.updateMapEndOffset(1L)
-    EasyMock.expectLastCall()
+    EasyMock.expect(stateManager.updateMapEndOffset(1L))
+    EasyMock.expect(stateManager.onHighWatermarkUpdated(0L))
+
     // Finally, we take a snapshot
     stateManager.takeSnapshot()
     EasyMock.expectLastCall().once()
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 8f0ed49..be1ab69 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -578,7 +578,7 @@ class ProducerStateManagerTest {
   }
 
   @Test
-  def testTruncate(): Unit = {
+  def testTruncateFullyAndStartAt(): Unit = {
     val epoch = 0.toShort
 
     append(stateManager, producerId, epoch, 0, 0L)
@@ -592,7 +592,7 @@ class ProducerStateManagerTest {
     assertEquals(2, logDir.listFiles().length)
     assertEquals(Set(2, 3), currentSnapshotOffsets)
 
-    stateManager.truncate()
+    stateManager.truncateFullyAndStartAt(0L)
 
     assertEquals(0, logDir.listFiles().length)
     assertEquals(Set(), currentSnapshotOffsets)

Reply via email to