This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 61be769  KAFKA-7959; Delete leader epoch cache files with old message 
format versions (#6298)
61be769 is described below

commit 61be76930e67f85d6534f194ba14dbe8dcd542ec
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Fri Feb 22 22:56:08 2019 +0000

    KAFKA-7959; Delete leader epoch cache files with old message format 
versions (#6298)
    
    It is important to clean up any cached epochs that may exist if the log 
message format does not support it (due to a regression in KAFKA-7415). 
Otherwise, the broker might make use of them once it upgrades its message 
format. This can cause unnecessary truncation of data.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/common/record/RecordFormat.java   |  9 ++++
 core/src/main/scala/kafka/cluster/Replica.scala    |  6 +--
 core/src/main/scala/kafka/log/Log.scala            | 20 ++++++++-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  |  4 ++
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 51 ++++++++++++++++++++++
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  1 +
 6 files changed, 86 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java 
b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
index e71ec59..2e43739 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
@@ -25,6 +25,15 @@ public enum RecordFormat {
         this.value = (byte) value;
     }
 
+    /**
+     * Check whether this version precedes another version.
+     *
+     * @return true only if the magic value is less than the other's
+     */
+    public boolean precedes(RecordFormat other) {
+        return this.value < other.value;
+    }
+
     public static RecordFormat lookup(byte version) {
         switch (version) {
             case 0: return V0;
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 81ab1d2..90ad369 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -60,10 +60,10 @@ class Replica(val brokerId: Int,
 
   def epochs: Option[LeaderEpochFileCache] = {
     log.flatMap { log =>
-      if (log.recordVersion.value < RecordFormat.V2.value)
-        None
-      else
+      if (log.supportsLeaderEpoch)
         Some(log.leaderEpochCache)
+      else
+        None
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 6edc3e2..add7d1e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -162,6 +162,8 @@ class Log(@volatile var dir: File,
 
   def recordVersion: RecordFormat = 
config.messageFormatVersion.messageFormatVersion
 
+  def supportsLeaderEpoch = recordVersion.value >= RecordFormat.V2.value
+
   def initFileSize: Int = {
     if (config.preallocate)
       config.segmentSize
@@ -177,7 +179,15 @@ class Log(@volatile var dir: File,
       warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is 
set to ${newConfig.retentionMs}. It is smaller than " +
         s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value 
${newConfig.messageTimestampDifferenceMaxMs}. " +
         s"This may result in frequent log rolling.")
+    val oldConfig = this.config
     this.config = newConfig
+    if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
+      val oldRecordVersion = 
oldConfig.messageFormatVersion.messageFormatVersion
+      val newRecordVersion = 
newConfig.messageFormatVersion.messageFormatVersion
+      if (newRecordVersion.precedes(oldRecordVersion))
+        warn(s"Record format version has been downgraded from 
$oldRecordVersion to $newRecordVersion.")
+      _leaderEpochCache = initializeLeaderEpochCache()
+    }
   }
 
   private def checkIfMemoryMappedBufferClosed(): Unit = {
@@ -277,7 +287,13 @@ class Log(@volatile var dir: File,
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
     val checkpointFile = new 
LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
-    new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+    val cache = new LeaderEpochFileCache(topicPartition, logEndOffset _, 
checkpointFile)
+
+    if (!supportsLeaderEpoch && cache.nonEmpty) {
+      warn(s"Clearing non-empty leader epoch cache due to incompatible message 
format $recordVersion")
+      cache.clearAndFlush()
+    }
+    cache
   }
 
   private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
@@ -448,7 +464,7 @@ class Log(@volatile var dir: File,
         info(s"Recovering unflushed segment ${segment.baseOffset}")
         val truncatedBytes =
           try {
-            recoverSegment(segment, Some(_leaderEpochCache))
+            recoverSegment(segment, if (supportsLeaderEpoch) 
Some(_leaderEpochCache) else None)
           } catch {
             case _: InvalidOffsetException =>
               val startOffset = segment.baseOffset
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 10bdb17..e8c94ac 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -84,6 +84,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def nonEmpty: Boolean = inReadLock(lock) {
+    epochs.nonEmpty
+  }
+
   /**
     * Returns the current Leader Epoch. This is the latest epoch
     * which has messages assigned to it.
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6be9203..7d7295d 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -29,6 +29,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, 
LogDirFailureChannel}
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -2255,6 +2256,56 @@ class LogTest {
   def topicPartitionName(topic: String, partition: String): String =
     topic + "-" + partition
 
+  /**
+    * Due to KAFKA-7968, we want to make sure that we do not
+    * make use of old leader epoch cache files when the message format does 
not support it
+    */
+  @Test
+  def testOldMessageFormatDeletesEpochCacheIfUnsupported(): Unit = {
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000)
+    val epochCacheSupportingConfig = createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+
+    // append some records to create segments and assign some epochs to create 
epoch files
+    val log = createLog(logDir, epochCacheSupportingConfig)
+    for (_ <- 0 until 100)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    log.leaderEpochCache.assign(0, 40)
+    log.leaderEpochCache.assign(1, 90)
+    assertEquals(100, log.leaderEpochCache.endOffsetFor(1))
+
+    // instantiate the log with an old format that does not support the leader 
epoch
+    val epochCacheNonSupportingConfig = createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000,
+      retentionMs = 999, messageFormatVersion = "0.10.2")
+    val log2 = createLog(logDir, epochCacheNonSupportingConfig)
+    assertLeaderEpochCacheEmpty(log2)
+  }
+
+  @Test
+  def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = {
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 
1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(1, log.leaderEpochCache.endOffsetFor(5))
+
+    val downgradedLogConfig = createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+      maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_10_2_IV0.version)
+    log.updateConfig(Set(LogConfig.MessageFormatVersionProp), 
downgradedLogConfig)
+    assertLeaderEpochCacheEmpty(log)
+
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("bar".getBytes())),
+      magicValue = RecordFormat.V1.value), leaderEpoch = 5)
+    assertLeaderEpochCacheEmpty(log)
+  }
+
+  private def assertLeaderEpochCacheEmpty(log: Log): Unit = {
+    assertFalse(log.leaderEpochCache.nonEmpty)
+
+    // check that the file is empty as well
+    val checkpointFile = new 
LeaderEpochCheckpointFile(LeaderEpochFile.newFile(log.dir))
+    val cache = new LeaderEpochFileCache(log.topicPartition, log.logEndOffset 
_, checkpointFile)
+    assertFalse(cache.nonEmpty)
+  }
+
   @Test
   def testDeleteOldSegments() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000)
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 994f4a0..f351d66 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -51,6 +51,7 @@ class OffsetsForLeaderEpochTest {
     val logManager = createNiceMock(classOf[kafka.log.LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
+    expect(mockLog.supportsLeaderEpoch).andReturn(true).anyTimes()
     expect(mockLog.recordVersion).andReturn(RecordFormat.V2).anyTimes()
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(mockCache, mockLog, logManager)

Reply via email to