This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 61be769 KAFKA-7959; Delete leader epoch cache files with old message format versions (#6298) 61be769 is described below commit 61be76930e67f85d6534f194ba14dbe8dcd542ec Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Fri Feb 22 22:56:08 2019 +0000 KAFKA-7959; Delete leader epoch cache files with old message format versions (#6298) It is important to clean up any cached epochs that may exist if the log message format does not support it (due to a regression in KAFKA-7415). Otherwise, the broker might make use of them once it upgrades its message format. This can cause unnecessary truncation of data. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../apache/kafka/common/record/RecordFormat.java | 9 ++++ core/src/main/scala/kafka/cluster/Replica.scala | 6 +-- core/src/main/scala/kafka/log/Log.scala | 20 ++++++++- .../kafka/server/epoch/LeaderEpochFileCache.scala | 4 ++ core/src/test/scala/unit/kafka/log/LogTest.scala | 51 ++++++++++++++++++++++ .../server/epoch/OffsetsForLeaderEpochTest.scala | 1 + 6 files changed, 86 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java index e71ec59..2e43739 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java @@ -25,6 +25,15 @@ public enum RecordFormat { this.value = (byte) value; } + /** + * Check whether this version precedes another version. + * + * @return true only if the magic value is less than the other's + */ + public boolean precedes(RecordFormat other) { + return this.value < other.value; + } + public static RecordFormat lookup(byte version) { switch (version) { case 0: return V0; diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 81ab1d2..90ad369 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -60,10 +60,10 @@ class Replica(val brokerId: Int, def epochs: Option[LeaderEpochFileCache] = { log.flatMap { log => - if (log.recordVersion.value < RecordFormat.V2.value) - None - else + if (log.supportsLeaderEpoch) Some(log.leaderEpochCache) + else + None } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6edc3e2..add7d1e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -162,6 +162,8 @@ class Log(@volatile var dir: File, def recordVersion: RecordFormat = config.messageFormatVersion.messageFormatVersion + def supportsLeaderEpoch = recordVersion.value >= RecordFormat.V2.value + def initFileSize: Int = { if (config.preallocate) config.segmentSize @@ -177,7 +179,15 @@ class Log(@volatile var dir: File, warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is set to ${newConfig.retentionMs}. It is smaller than " + s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${newConfig.messageTimestampDifferenceMaxMs}. " + s"This may result in frequent log rolling.") + val oldConfig = this.config this.config = newConfig + if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) { + val oldRecordVersion = oldConfig.messageFormatVersion.messageFormatVersion + val newRecordVersion = newConfig.messageFormatVersion.messageFormatVersion + if (newRecordVersion.precedes(oldRecordVersion)) + warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.") + _leaderEpochCache = initializeLeaderEpochCache() + } } private def checkIfMemoryMappedBufferClosed(): Unit = { @@ -277,7 +287,13 @@ class Log(@volatile var dir: File, // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel) - new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) + val cache = new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) + + if (!supportsLeaderEpoch && cache.nonEmpty) { + warn(s"Clearing non-empty leader epoch cache due to incompatible message format $recordVersion") + cache.clearAndFlush() + } + cache } private def removeTempFilesAndCollectSwapFiles(): Set[File] = { @@ -448,7 +464,7 @@ class Log(@volatile var dir: File, info(s"Recovering unflushed segment ${segment.baseOffset}") val truncatedBytes = try { - recoverSegment(segment, Some(_leaderEpochCache)) + recoverSegment(segment, if (supportsLeaderEpoch) Some(_leaderEpochCache) else None) } catch { case _: InvalidOffsetException => val startOffset = segment.baseOffset diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 10bdb17..e8c94ac 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -84,6 +84,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def nonEmpty: Boolean = inReadLock(lock) { + epochs.nonEmpty + } + /** * Returns the current Leader Epoch. This is the latest epoch * which has messages assigned to it. diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6be9203..7d7295d 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -29,6 +29,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} +import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -2255,6 +2256,56 @@ class LogTest { def topicPartitionName(topic: String, partition: String): String = topic + "-" + partition + /** + * Due to KAFKA-7968, we want to make sure that we do not + * make use of old leader epoch cache files when the message format does not support it + */ + @Test + def testOldMessageFormatDeletesEpochCacheIfUnsupported(): Unit = { + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) + val epochCacheSupportingConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) + + // append some records to create segments and assign some epochs to create epoch files + val log = createLog(logDir, epochCacheSupportingConfig) + for (_ <- 0 until 100) + log.appendAsLeader(createRecords, leaderEpoch = 0) + log.leaderEpochCache.assign(0, 40) + log.leaderEpochCache.assign(1, 90) + assertEquals(100, log.leaderEpochCache.endOffsetFor(1)) + + // instantiate the log with an old format that does not support the leader epoch + val epochCacheNonSupportingConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, + retentionMs = 999, messageFormatVersion = "0.10.2") + val log2 = createLog(logDir, epochCacheNonSupportingConfig) + assertLeaderEpochCacheEmpty(log2) + } + + @Test + def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = { + val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + val log = createLog(logDir, logConfig) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) + assertEquals(1, log.leaderEpochCache.endOffsetFor(5)) + + val downgradedLogConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, + maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_10_2_IV0.version) + log.updateConfig(Set(LogConfig.MessageFormatVersionProp), downgradedLogConfig) + assertLeaderEpochCacheEmpty(log) + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), + magicValue = RecordFormat.V1.value), leaderEpoch = 5) + assertLeaderEpochCacheEmpty(log) + } + + private def assertLeaderEpochCacheEmpty(log: Log): Unit = { + assertFalse(log.leaderEpochCache.nonEmpty) + + // check that the file is empty as well + val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(log.dir)) + val cache = new LeaderEpochFileCache(log.topicPartition, log.logEndOffset _, checkpointFile) + assertFalse(cache.nonEmpty) + } + @Test def testDeleteOldSegments() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 994f4a0..f351d66 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -51,6 +51,7 @@ class OffsetsForLeaderEpochTest { val logManager = createNiceMock(classOf[kafka.log.LogManager]) expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset) expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() + expect(mockLog.supportsLeaderEpoch).andReturn(true).anyTimes() expect(mockLog.recordVersion).andReturn(RecordFormat.V2).anyTimes() expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() replay(mockCache, mockLog, logManager)