Repository: kafka Updated Branches: refs/heads/trunk 65861a712 -> fcdbb7195
KAFKA-5186; Avoid expensive log scan to build producer state when upgrading Author: Jason Gustafson <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #3113 from hachikuji/KAFKA-5186 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fcdbb719 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fcdbb719 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fcdbb719 Branch: refs/heads/trunk Commit: fcdbb71953fc4c92559a9c7adb4cb8ad4a74acd6 Parents: 65861a7 Author: Jason Gustafson <[email protected]> Authored: Mon May 22 15:41:26 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 22 15:41:26 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 50 +++++++++++++------- .../src/test/scala/unit/kafka/log/LogTest.scala | 27 +++++++++++ 2 files changed, 60 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fcdbb719/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7a47657..55eb46a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -421,25 +421,41 @@ class Log(@volatile var dir: File, private def loadProducerState(lastOffset: Long): Unit = lock synchronized { info(s"Loading producer state from offset $lastOffset for partition $topicPartition") - val currentTimeMs = time.milliseconds - producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs) - - // only do the potentially expensive reloading of the last snapshot offset is lower than the - // log end offset (which would be the case on first startup) and there are active producers. - // if there are no active producers, then truncating shouldn't change that fact (although it - // could cause a producerId to expire earlier than expected), so we can skip the loading. - // This is an optimization for users which are not yet using idempotent/transactional features yet. - if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) { - logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => - val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset) - val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) - if (fetchDataInfo != null) - loadProducersFromLog(producerStateManager, fetchDataInfo.records) + + if (producerStateManager.latestSnapshotOffset.isEmpty) { + // if there are no snapshots to load producer state from, we assume that the brokers are + // being upgraded, which means there would be no previous idempotent/transactional producers + // to load state for. To avoid an expensive scan through all of the segments, we take + // empty snapshots from the start of the last two segments and the last offset. The purpose + // of taking the segment snapshots is to avoid the full scan in the case that the log needs + // truncation. + val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset) + val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) + offsetsToSnapshot.flatten.foreach { offset => + producerStateManager.updateMapEndOffset(offset) + producerStateManager.takeSnapshot() + } + } else { + val currentTimeMs = time.milliseconds + producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs) + + // only do the potentially expensive reloading of the last snapshot offset is lower than the + // log end offset (which would be the case on first startup) and there are active producers. + // if there are no active producers, then truncating shouldn't change that fact (although it + // could cause a producerId to expire earlier than expected), so we can skip the loading. + // This is an optimization for users which are not yet using idempotent/transactional features yet. + if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) { + logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => + val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset) + val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) + if (fetchDataInfo != null) + loadProducersFromLog(producerStateManager, fetchDataInfo.records) + } } - } - producerStateManager.updateMapEndOffset(lastOffset) - updateFirstUnstableOffset() + producerStateManager.updateMapEndOffset(lastOffset) + updateFirstUnstableOffset() + } } private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/fcdbb719/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 99ebd15..84ff43b 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -169,6 +169,33 @@ class LogTest { } @Test + def testInitializationOfProducerSnapshotsUpgradePath(): Unit = { + // simulate the upgrade path by creating a new log with several segments, deleting the + // snapshot files, and then reloading the log + + val log = createLog(64, messagesPerSegment = 10) + assertEquals(None, log.oldestProducerSnapshotOffset) + + for (i <- 0 to 100) { + val record = new SimpleRecord(time.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + } + + assertTrue(log.logSegments.size >= 2) + log.close() + + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + Files.delete(file.toPath) + } + + val reloadedLog = createLog(64, messagesPerSegment = 10) + val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset) + expectedSnapshotsOffsets.foreach { offset => + assertTrue(Log.producerSnapshotFile(logDir, offset).exists) + } + } + + @Test def testPidMapOffsetUpdatedForNonIdempotentData() { val log = createLog(2048) val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)))
