This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 815a228  KAFKA-9105; Add back truncateHead method to 
ProducerStateManager (#7599)
815a228 is described below

commit 815a2287357bf4028e8a78e3ac15ada951d4067d
Author: Bob Barrett <[email protected]>
AuthorDate: Fri Oct 25 14:40:45 2019 -0700

    KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)
    
    The truncateHead method was removed from ProducerStateManager by 
github.com/apache/kafka/commit/c49775b. This meant that snapshots were no 
longer removed when the log start offset increased, even though the intent of 
that change was to remove snapshots but preserve the in-memory mapping. This 
patch adds the required functionality back.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala            |  1 +
 .../scala/kafka/log/ProducerStateManager.scala     | 21 ++++++++++++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 29 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 5e04c3c..ccf1d16 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1254,6 +1254,7 @@ class Log(@volatile var dir: File,
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
+          producerStateManager.truncateHead(newLogStartOffset)
           maybeIncrementFirstUnstableOffset()
         }
       }
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index ae5b77a..04dafd8 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -598,8 +598,9 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Truncate the producer id mapping to the given offset range and reload the 
entries from the most recent
    * snapshot in range (if there is one). We delete snapshot files prior to 
the logStartOffset but do not remove
    * producer state from the map. This means that in-memory and on-disk state 
can diverge, and in the case of
-   * broker failover or unclean shutdown, any in-memory state not persisted in 
the snapshots will be lost.
-   * Note that the log end offset is assumed to be less than or equal to the 
high watermark.
+   * broker failover or unclean shutdown, any in-memory state not persisted in 
the snapshots will be lost, which
+   * would lead to UNKNOWN_PRODUCER_ID errors. Note that the log end offset is 
assumed to be less than or equal
+   * to the high watermark.
    */
   def truncateAndReload(logStartOffset: Long, logEndOffset: Long, 
currentTimeMs: Long): Unit = {
     // remove all out of range snapshots
@@ -615,6 +616,8 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
       // safe to clear the unreplicated transactions
       unreplicatedTxns.clear()
       loadFromSnapshot(logStartOffset, currentTimeMs)
+    } else {
+      truncateHead(logStartOffset)
     }
   }
 
@@ -688,6 +691,20 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    */
   def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => 
offsetFromFile(file))
 
+  /**
+   * When we remove the head of the log due to retention, we need to remove 
snapshots older than
+   * the new log start offset.
+   */
+  def truncateHead(logStartOffset: Long): Unit = {
+    removeUnreplicatedTransactions(logStartOffset)
+
+    if (lastMapOffset < logStartOffset)
+      lastMapOffset = logStartOffset
+
+    deleteSnapshotsBefore(logStartOffset)
+    lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
+  }
+
   private def removeUnreplicatedTransactions(offset: Long): Unit = {
     val iterator = unreplicatedTxns.entrySet.iterator
     while (iterator.hasNext) {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fdf40c6..570e252 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1504,6 +1504,35 @@ class LogTest {
     log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
+  @Test
+  def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val pid2 = 2L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+
+    assertEquals(2, log.activeProducersWithLastSequence.size)
+    assertEquals(2, 
ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(2L)
+
+    // Deleting records should not remove producer state but should delete 
snapshots
+    assertEquals(2, log.activeProducersWithLastSequence.size)
+    assertEquals(1, 
ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+    val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertTrue(retainedLastSeqOpt.isDefined)
+    assertEquals(0, retainedLastSeqOpt.get)
+  }
+
   /**
    * Test for jitter s for time based log roll. This test appends messages 
then changes the time
    * using the mock clock to force the log to roll and checks the number of 
segments.

Reply via email to