Repository: kafka Updated Branches: refs/heads/trunk 4c75f31a5 -> 7bb551b4a
KAFKA-5249; Fix incorrect producer snapshot offsets when recovering segments Author: Jason Gustafson <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #3060 from hachikuji/KAFKA-5249 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7bb551b4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7bb551b4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7bb551b4 Branch: refs/heads/trunk Commit: 7bb551b4a1737f1819e11e08248b1f2277a0680e Parents: 4c75f31 Author: Jason Gustafson <[email protected]> Authored: Mon May 15 15:23:26 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Mon May 15 15:23:26 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 1 + core/src/main/scala/kafka/log/LogSegment.scala | 1 + .../scala/kafka/log/ProducerStateManager.scala | 10 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 8 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 108 ++++++++++++++++++- .../kafka/log/ProducerStateManagerTest.scala | 21 ++++ 6 files changed, 141 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e2d9489..e3a21d1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -316,6 +316,7 @@ class Log(@volatile var dir: File, if (fetchDataInfo != null) loadProducersFromLog(stateManager, fetchDataInfo.records) } + stateManager.updateMapEndOffset(segment.baseOffset) val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache) // once we have recovered the segment's data, take a snapshot to ensure that we won't http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 6699143..cf3ef0e 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -156,6 +156,7 @@ class LogSegment(val log: FileRecords, updateTxnIndex(completedTxn, lastStableOffset) } } + producerStateManager.updateMapEndOffset(batch.lastOffset + 1) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 02609b2..ba7c470 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -429,6 +429,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, * or equal to the high watermark. */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { + // remove all out of range snapshots + deleteSnapshotFiles { file => + val offset = offsetFromFilename(file.getName) + offset > logEndOffset || offset <= logStartOffset + } + if (logEndOffset != mapEndOffset) { producers.clear() ongoingTxns.clear() @@ -436,10 +442,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, // since we assume that the offset is less than or equal to the high watermark, it is // safe to clear the unreplicated transactions unreplicatedTxns.clear() - deleteSnapshotFiles { file => - val offset = offsetFromFilename(file.getName) - offset > logEndOffset || offset <= logStartOffset - } loadFromSnapshot(logStartOffset, currentTimeMs) } else { evictUnretainedProducers(logStartOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index c3da9b3..5db1ed6 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -302,7 +302,10 @@ class LogSegmentTest { segment.append(firstOffset = 107L, largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP, shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L)) - segment.recover(64 * 1024, new ProducerStateManager(topicPartition, logDir)) + var stateManager = new ProducerStateManager(topicPartition, logDir) + segment.recover(64 * 1024, stateManager) + assertEquals(108L, stateManager.mapEndOffset) + var abortedTxns = segment.txnIndex.allAbortedTxns assertEquals(1, abortedTxns.size) @@ -313,9 +316,10 @@ class LogSegmentTest { assertEquals(100L, abortedTxn.lastStableOffset) // recover again, but this time assuming the transaction from pid2 began on a previous segment - val stateManager = new ProducerStateManager(topicPartition, logDir) + stateManager = new ProducerStateManager(topicPartition, logDir) stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L))) segment.recover(64 * 1024, stateManager) + assertEquals(108L, stateManager.mapEndOffset) abortedTxns = segment.txnIndex.allAbortedTxns assertEquals(1, abortedTxns.size) http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index d7ca029..8c330ed 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -178,7 +178,7 @@ class LogTest { @Test def testRebuildPidMapWithCompactedData() { - val log = createLog(2048, pidSnapshotIntervalMs = Int.MaxValue) + val log = createLog(2048) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -2319,6 +2319,110 @@ class LogTest { } @Test + def testRecoverOnlyLastSegment(): Unit = { + val log = createLog(128) + val epoch = 0.toShort + + val pid1 = 1L + val pid2 = 2L + val pid3 = 3L + val pid4 = 4L + + val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch) + val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch) + val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch) + val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch) + + // mix transactional and non-transactional data + appendPid1(5) // nextOffset: 5 + appendNonTransactionalAsLeader(log, 3) // 8 + appendPid2(2) // 10 + appendPid1(4) // 14 + appendPid3(3) // 17 + appendNonTransactionalAsLeader(log, 2) // 19 + appendPid1(10) // 29 + appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30 + appendPid2(6) // 36 + appendPid4(3) // 39 + appendNonTransactionalAsLeader(log, 10) // 49 + appendPid3(9) // 58 + appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59 + appendPid4(8) // 67 + appendPid2(7) // 74 + appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75 + appendNonTransactionalAsLeader(log, 10) // 85 + appendPid4(4) // 89 + appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 + + // delete the last offset and transaction index files to force recovery + val lastSegment = log.logSegments.last + val recoveryPoint = lastSegment.baseOffset + lastSegment.index.delete() + lastSegment.txnIndex.delete() + + log.close() + + val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint) + val abortedTransactions = allAbortedTransactions(reloadedLog) + assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + } + + @Test + def testRecoverLastSegmentWithNoSnapshots(): Unit = { + val log = createLog(128) + val epoch = 0.toShort + + val pid1 = 1L + val pid2 = 2L + val pid3 = 3L + val pid4 = 4L + + val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch) + val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch) + val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch) + val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch) + + // mix transactional and non-transactional data + appendPid1(5) // nextOffset: 5 + appendNonTransactionalAsLeader(log, 3) // 8 + appendPid2(2) // 10 + appendPid1(4) // 14 + appendPid3(3) // 17 + appendNonTransactionalAsLeader(log, 2) // 19 + appendPid1(10) // 29 + appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30 + appendPid2(6) // 36 + appendPid4(3) // 39 + appendNonTransactionalAsLeader(log, 10) // 49 + appendPid3(9) // 58 + appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59 + appendPid4(8) // 67 + appendPid2(7) // 74 + appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75 + appendNonTransactionalAsLeader(log, 10) // 85 + appendPid4(4) // 89 + appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 + + // delete all snapshot files + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + file.delete() + } + + // delete the last offset and transaction index files to force recovery. this should force us to rebuild + // the producer state from the start of the log + val lastSegment = log.logSegments.last + val recoveryPoint = lastSegment.baseOffset + lastSegment.index.delete() + lastSegment.txnIndex.delete() + + log.close() + + val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint) + val abortedTransactions = allAbortedTransactions(reloadedLog) + assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + } + + @Test def testTransactionIndexUpdatedThroughReplication(): Unit = { val epoch = 0.toShort val log = createLog(1024 * 1024) @@ -2474,7 +2578,7 @@ class LogTest { private def createLog(messageSizeInBytes: Int, retentionMs: Int = -1, retentionBytes: Int = -1, cleanupPolicy: String = "delete", messagesPerSegment: Int = 5, maxPidExpirationMs: Int = 300000, pidExpirationCheckIntervalMs: Int = 30000, - pidSnapshotIntervalMs: Int = 60000): Log = { + recoveryPoint: Long = 0L): Log = { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, messageSizeInBytes * messagesPerSegment: Integer) logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer) http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 353642b..ad26339 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -23,6 +23,7 @@ import kafka.server.LogOffsetMetadata import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors._ +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.utils.{MockTime, Utils} import org.junit.Assert._ @@ -227,6 +228,26 @@ class ProducerStateManagerTest extends JUnitSuite { } @Test + def testTruncateAndReloadRemovesOutOfRangeSnapshots(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, epoch, 0, 0L) + idMapping.takeSnapshot() + append(idMapping, pid, epoch, 1, 1L) + idMapping.takeSnapshot() + append(idMapping, pid, epoch, 2, 2L) + idMapping.takeSnapshot() + append(idMapping, pid, epoch, 3, 3L) + idMapping.takeSnapshot() + append(idMapping, pid, epoch, 4, 4L) + idMapping.takeSnapshot() + + idMapping.truncateAndReload(1L, 3L, time.milliseconds()) + + assertEquals(Some(2L), idMapping.oldestSnapshotOffset) + assertEquals(Some(3L), idMapping.latestSnapshotOffset) + } + + @Test def testTakeSnapshot(): Unit = { val epoch = 0.toShort append(idMapping, pid, 0, epoch, 0L, 0L)
