This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 1a4942f KAFKA-12153; Update producer state before updating start/end
offsets after truncation (#9838)
1a4942f is described below
commit 1a4942fd70b632e58197d2e80ce3639af37eecb1
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jan 8 17:38:40 2021 -0800
KAFKA-12153; Update producer state before updating start/end offsets after
truncation (#9838)
When we truncate the log, the first unstable offset might become valid. On
the other hand, the logic in `updateHighWatermarkMetadata` assumes that the
first stable offset remains at a valid position. Since this method can be
reached through either `updateLogStartOffset` or `updateLogEndOffset` in the
truncation paths, we need to ensure that the first unstable offset first
reflects the truncated state.
Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 73 +++++++++++++---------
.../scala/kafka/log/ProducerStateManager.scala | 4 +-
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 58 ++++++++++++++++-
.../unit/kafka/log/ProducerStateManagerTest.scala | 4 +-
5 files changed, 106 insertions(+), 35 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 42aeb6e..5b4945a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -358,28 +358,28 @@ class Log(@volatile private var _dir: File,
* @return the updated high watermark offset
*/
def updateHighWatermark(hw: Long): Long = {
- val newHighWatermark = if (hw < logStartOffset)
- logStartOffset
- else if (hw > logEndOffset)
- logEndOffset
- else
- hw
- updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
- newHighWatermark
- }
-
- def updateHighWatermarkOffsetMetadata(hw: LogOffsetMetadata): Long = {
- val newHighWatermark = if (hw.messageOffset < logStartOffset) {
- updateHighWatermarkMetadata(LogOffsetMetadata(logStartOffset))
- logStartOffset
- } else if (hw.messageOffset > logEndOffset) {
- updateHighWatermarkMetadata(logEndOffsetMetadata)
- logEndOffset
+ updateHighWatermark(LogOffsetMetadata(hw))
+ }
+
+ /**
+ * Update high watermark with offset metadata. The new high watermark will
be lower
+ * bounded by the log start offset and upper bounded by the log end offset.
+ *
+ * @param highWatermarkMetadata the suggested high watermark with offset
metadata
+ * @return the updated high watermark offset
+ */
+ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
+ val endOffsetMetadata = logEndOffsetMetadata
+ val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset <
logStartOffset) {
+ LogOffsetMetadata(logStartOffset)
+ } else if (highWatermarkMetadata.messageOffset >=
endOffsetMetadata.messageOffset) {
+ endOffsetMetadata
} else {
- updateHighWatermarkMetadata(hw)
- hw.messageOffset
+ highWatermarkMetadata
}
- newHighWatermark
+
+ updateHighWatermarkMetadata(newHighWatermarkMetadata)
+ newHighWatermarkMetadata.messageOffset
}
/**
@@ -435,6 +435,10 @@ class Log(@volatile private var _dir: File,
throw new IllegalArgumentException("High watermark offset should be
non-negative")
lock synchronized {
+ if (newHighWatermark.messageOffset <
highWatermarkMetadata.messageOffset) {
+ warn(s"Non-monotonic update of high watermark from
$highWatermarkMetadata to $newHighWatermark")
+ }
+
highWatermarkMetadata = newHighWatermark
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset()
@@ -2125,10 +2129,12 @@ class Log(@volatile private var _dir: File,
val deletable = logSegments.filter(segment => segment.baseOffset >
targetOffset)
removeAndDeleteSegments(deletable, asyncDelete = true,
LogTruncation)
activeSegment.truncateTo(targetOffset)
- updateLogEndOffset(targetOffset)
- updateLogStartOffset(math.min(targetOffset, this.logStartOffset))
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
- loadProducerState(targetOffset, reloadFromCleanShutdown = false)
+
+ completeTruncation(
+ startOffset = math.min(targetOffset, logStartOffset),
+ endOffset = targetOffset
+ )
}
true
}
@@ -2154,17 +2160,28 @@ class Log(@volatile private var _dir: File,
fileAlreadyExists = false,
initFileSize = initFileSize,
preallocate = config.preallocate))
- updateLogEndOffset(newOffset)
leaderEpochCache.foreach(_.clearAndFlush())
+ producerStateManager.truncateFullyAndStartAt(newOffset)
- producerStateManager.truncate()
- producerStateManager.updateMapEndOffset(newOffset)
- maybeIncrementFirstUnstableOffset()
- updateLogStartOffset(newOffset)
+ completeTruncation(
+ startOffset = newOffset,
+ endOffset = newOffset
+ )
}
}
}
+ private def completeTruncation(
+ startOffset: Long,
+ endOffset: Long
+ ): Unit = {
+ logStartOffset = startOffset
+ nextOffsetMetadata = LogOffsetMetadata(endOffset,
activeSegment.baseOffset, activeSegment.size)
+ recoveryPoint = math.min(recoveryPoint, endOffset)
+ rebuildProducerState(endOffset, reloadFromCleanShutdown = false,
producerStateManager)
+ updateHighWatermark(math.min(highWatermark, endOffset))
+ }
+
/**
* The time this log is last known to have been fully flushed to disk
*/
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index c1d2704..5113335 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -702,13 +702,13 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
/**
* Truncate the producer id mapping and remove all snapshots. This resets
the state of the mapping.
*/
- def truncate(): Unit = {
+ def truncateFullyAndStartAt(offset: Long): Unit = {
producers.clear()
ongoingTxns.clear()
unreplicatedTxns.clear()
deleteSnapshotFiles(logDir)
lastSnapOffset = 0L
- lastMapOffset = 0L
+ lastMapOffset = offset
}
/**
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index bbf32be..4a7aa46 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -112,7 +112,7 @@ class KafkaMetadataLog(log: Log,
override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
offsetMetadata.metadata.asScala match {
- case Some(segmentPosition: SegmentPosition) =>
log.updateHighWatermarkOffsetMetadata(
+ case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark(
new kafka.server.LogOffsetMetadata(
offsetMetadata.offset,
segmentPosition.baseOffset,
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2a16836..1582e56 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -115,6 +115,59 @@ class LogTest {
}
@Test
+ def testTruncateBelowFirstUnstableOffset(): Unit = {
+ testTruncateBelowFirstUnstableOffset(_.truncateTo)
+ }
+
+ @Test
+ def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
+ testTruncateBelowFirstUnstableOffset(_.truncateFullyAndStartAt)
+ }
+
+ private def testTruncateBelowFirstUnstableOffset(
+ truncateFunc: Log => (Long => Unit)
+ ): Unit = {
+ // Verify that truncation below the first unstable offset correctly
+ // resets the producer state. Specifically we are testing the case when
+ // the segment position of the first unstable offset is unknown.
+
+ val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val log = createLog(logDir, logConfig)
+
+ val producerId = 17L
+ val producerEpoch: Short = 10
+ val sequence = 0
+
+ log.appendAsLeader(TestUtils.records(List(
+ new SimpleRecord("0".getBytes),
+ new SimpleRecord("1".getBytes),
+ new SimpleRecord("2".getBytes)
+ )), leaderEpoch = 0)
+
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ CompressionType.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes),
+ new SimpleRecord("4".getBytes)
+ ), leaderEpoch = 0)
+
+ assertEquals(Some(3L), log.firstUnstableOffset)
+
+ // We close and reopen the log to ensure that the first unstable offset
segment
+ // position will be undefined when we truncate the log.
+ log.close()
+
+ val reopened = createLog(logDir, logConfig)
+ assertEquals(Some(LogOffsetMetadata(3L)),
reopened.producerStateManager.firstUnstableOffset)
+
+ truncateFunc(reopened)(0L)
+ assertEquals(None, reopened.firstUnstableOffset)
+ assertEquals(Map.empty, reopened.producerStateManager.activeProducers)
+ }
+
+ @Test
def testHighWatermarkMaintenance(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
@@ -930,8 +983,9 @@ class LogTest {
// Truncation causes the map end offset to reset to 0
EasyMock.expect(stateManager.mapEndOffset).andReturn(0L)
// We skip directly to updating the map end offset
- stateManager.updateMapEndOffset(1L)
- EasyMock.expectLastCall()
+ EasyMock.expect(stateManager.updateMapEndOffset(1L))
+ EasyMock.expect(stateManager.onHighWatermarkUpdated(0L))
+
// Finally, we take a snapshot
stateManager.takeSnapshot()
EasyMock.expectLastCall().once()
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 8f0ed49..be1ab69 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -578,7 +578,7 @@ class ProducerStateManagerTest {
}
@Test
- def testTruncate(): Unit = {
+ def testTruncateFullyAndStartAt(): Unit = {
val epoch = 0.toShort
append(stateManager, producerId, epoch, 0, 0L)
@@ -592,7 +592,7 @@ class ProducerStateManagerTest {
assertEquals(2, logDir.listFiles().length)
assertEquals(Set(2, 3), currentSnapshotOffsets)
- stateManager.truncate()
+ stateManager.truncateFullyAndStartAt(0L)
assertEquals(0, logDir.listFiles().length)
assertEquals(Set(), currentSnapshotOffsets)