This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 815a228 KAFKA-9105; Add back truncateHead method to
ProducerStateManager (#7599)
815a228 is described below
commit 815a2287357bf4028e8a78e3ac15ada951d4067d
Author: Bob Barrett <[email protected]>
AuthorDate: Fri Oct 25 14:40:45 2019 -0700
KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)
The truncateHead method was removed from ProducerStateManager by
github.com/apache/kafka/commit/c49775b. This meant that snapshots were no
longer removed when the log start offset increased, even though the intent of
that change was to remove snapshots but preserve the in-memory mapping. This
patch adds the required functionality back.
Reviewers: Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 1 +
.../scala/kafka/log/ProducerStateManager.scala | 21 ++++++++++++++--
core/src/test/scala/unit/kafka/log/LogTest.scala | 29 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 5e04c3c..ccf1d16 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1254,6 +1254,7 @@ class Log(@volatile var dir: File,
info(s"Incrementing log start offset to $newLogStartOffset")
logStartOffset = newLogStartOffset
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
+ producerStateManager.truncateHead(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
}
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index ae5b77a..04dafd8 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -598,8 +598,9 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
* Truncate the producer id mapping to the given offset range and reload the
entries from the most recent
* snapshot in range (if there is one). We delete snapshot files prior to
the logStartOffset but do not remove
* producer state from the map. This means that in-memory and on-disk state
can diverge, and in the case of
- * broker failover or unclean shutdown, any in-memory state not persisted in
the snapshots will be lost.
- * Note that the log end offset is assumed to be less than or equal to the
high watermark.
+ * broker failover or unclean shutdown, any in-memory state not persisted in
the snapshots will be lost, which
+ * would lead to UNKNOWN_PRODUCER_ID errors. Note that the log end offset is
assumed to be less than or equal
+ * to the high watermark.
*/
def truncateAndReload(logStartOffset: Long, logEndOffset: Long,
currentTimeMs: Long): Unit = {
// remove all out of range snapshots
@@ -615,6 +616,8 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
// safe to clear the unreplicated transactions
unreplicatedTxns.clear()
loadFromSnapshot(logStartOffset, currentTimeMs)
+ } else {
+ truncateHead(logStartOffset)
}
}
@@ -688,6 +691,20 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
*/
def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file =>
offsetFromFile(file))
+ /**
+ * When we remove the head of the log due to retention, we need to remove
snapshots older than
+ * the new log start offset.
+ */
+ def truncateHead(logStartOffset: Long): Unit = {
+ removeUnreplicatedTransactions(logStartOffset)
+
+ if (lastMapOffset < logStartOffset)
+ lastMapOffset = logStartOffset
+
+ deleteSnapshotsBefore(logStartOffset)
+ lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
+ }
+
private def removeUnreplicatedTransactions(offset: Long): Unit = {
val iterator = unreplicatedTxns.entrySet.iterator
while (iterator.hasNext) {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fdf40c6..570e252 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1504,6 +1504,35 @@ class LogTest {
log.appendAsLeader(nextRecords, leaderEpoch = 0)
}
+ @Test
+ def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
+ val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig)
+ val pid1 = 1L
+ val pid2 = 2L
+ val epoch = 0.toShort
+
+ log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
+ producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+ log.roll()
+ log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
+ producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+ log.roll()
+
+ assertEquals(2, log.activeProducersWithLastSequence.size)
+ assertEquals(2,
ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+
+ log.updateHighWatermark(log.logEndOffset)
+ log.maybeIncrementLogStartOffset(2L)
+
+ // Deleting records should not remove producer state but should delete
snapshots
+ assertEquals(2, log.activeProducersWithLastSequence.size)
+ assertEquals(1,
ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+ val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+ assertTrue(retainedLastSeqOpt.isDefined)
+ assertEquals(0, retainedLastSeqOpt.get)
+ }
+
/**
* Test for jitter s for time based log roll. This test appends messages
then changes the time
* using the mock clock to force the log to roll and checks the number of
segments.