This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a9893cac0 KAFKA-13815: Avoid reinitialization for a replica that is 
being deleted (#12029)
0a9893cac0 is described below

commit 0a9893cac0db15d6de20f394bef46158410b4bcc
Author: Lucas Wang <[email protected]>
AuthorDate: Wed May 4 11:41:34 2022 -0700

    KAFKA-13815: Avoid reinitialization for a replica that is being deleted 
(#12029)
    
    This PR tries to avoid the reinitialization of the leader epoch cache
    and the partition metadata if the corresponding replica is being deleted.
    With this change, the asyncDelete method can run more efficiently,
    which means a StopReplica request with many partitions to be deleted can be
    processed more quickly.
    
    Reviewers: David Jacot <[email protected]>, Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala     |  6 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 50 ++++++++++-----
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  4 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 74 +++++++++++++++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 41 ++++++------
 5 files changed, 118 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 4023d27346..c79faeba3b 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1009,7 +1009,7 @@ class LogManager(logDirs: Seq[File],
       if (destLog == null)
         throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-      destLog.renameDir(UnifiedLog.logDirName(topicPartition))
+      destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
       destLog.updateHighWatermark(sourceLog.highWatermark)
 
       // Now that future replica has been successfully renamed to be the 
current replica
@@ -1022,7 +1022,7 @@ class LogManager(logDirs: Seq[File],
       }
 
       try {
-        sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
+        sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
         // Now that replica in source log directory has been successfully 
renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
         sourceLog.close()
@@ -1069,7 +1069,7 @@ class LogManager(logDirs: Seq[File],
             cleaner.updateCheckpoints(removedLog.parentDirFile, 
partitionToRemove = Option(topicPartition))
           }
         }
-        removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
+        removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
false)
         if (checkpoint) {
           val logDir = removedLog.parentDirFile
           val logsToCheckpoint = logsInDir(logDir)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e84b3238ed..99524385fb 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -287,7 +287,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   @volatile private var highWatermarkMetadata: LogOffsetMetadata = 
LogOffsetMetadata(logStartOffset)
 
-  @volatile var partitionMetadataFile : PartitionMetadataFile = null
+  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
 
   locally {
     initializePartitionMetadata()
@@ -307,9 +307,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    *   - Otherwise set _topicId to None
    */
   def initializeTopicId(): Unit =  {
-    if (partitionMetadataFile.exists()) {
+    val partMetadataFile = partitionMetadataFile.getOrElse(
+      throw new KafkaException("The partitionMetadataFile should have been 
initialized"))
+
+    if (partMetadataFile.exists()) {
       if (keepPartitionMetadataFile) {
-        val fileTopicId = partitionMetadataFile.read().topicId
+        val fileTopicId = partMetadataFile.read().topicId
         if (_topicId.isDefined && !_topicId.contains(fileTopicId))
           throw new InconsistentTopicIdException(s"Tried to assign topic ID 
$topicId to log for topic partition $topicPartition," +
             s"but log already contained topic ID $fileTopicId")
@@ -317,14 +320,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         _topicId = Some(fileTopicId)
 
       } else {
-        try partitionMetadataFile.delete()
+        try partMetadataFile.delete()
         catch {
           case e: IOException =>
-            error(s"Error while trying to delete partition metadata file 
${partitionMetadataFile}", e)
+            error(s"Error while trying to delete partition metadata file 
${partMetadataFile}", e)
         }
       }
     } else if (keepPartitionMetadataFile) {
-      _topicId.foreach(partitionMetadataFile.record)
+      _topicId.foreach(partMetadataFile.record)
       scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
     } else {
       // We want to keep the file and the in-memory topic ID in sync.
@@ -555,11 +558,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   private def initializePartitionMetadata(): Unit = lock synchronized {
     val partitionMetadata = PartitionMetadataFile.newFile(dir)
-    partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, 
logDirFailureChannel)
+    partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, 
logDirFailureChannel))
   }
 
   private def maybeFlushMetadataFile(): Unit = {
-    partitionMetadataFile.maybeFlush()
+    partitionMetadataFile.foreach(_.maybeFlush())
   }
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
@@ -574,9 +577,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       case None =>
         if (keepPartitionMetadataFile) {
           _topicId = Some(topicId)
-          if (!partitionMetadataFile.exists()) {
-            partitionMetadataFile.record(topicId)
-            scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
+          partitionMetadataFile match {
+            case Some(partMetadataFile) =>
+              if (!partMetadataFile.exists()) {
+                partMetadataFile.record(topicId)
+                scheduler.schedule("flush-metadata-file", 
maybeFlushMetadataFile)
+              }
+            case _ => warn(s"The topic id $topicId will not be persisted to 
the partition metadata file " +
+              "since the partition is deleted")
           }
         }
     }
@@ -675,21 +683,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   /**
-   * Rename the directory of the local log
+   * Rename the directory of the local log. If the log's directory is being 
renamed for async deletion due to a
+   * StopReplica request, then the shouldReinitialize parameter should be set 
to false, otherwise it should be set to true.
    *
+   * @param name The new name that this log's directory is being renamed to
+   * @param shouldReinitialize Whether the log's metadata should be 
reinitialized after renaming
    * @throws KafkaStorageException if rename fails
    */
-  def renameDir(name: String): Unit = {
+  def renameDir(name: String, shouldReinitialize: Boolean): Unit = {
     lock synchronized {
       maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
log dir ${dir.getParent}") {
         // Flush partitionMetadata file before initializing again
         maybeFlushMetadataFile()
         if (localLog.renameDir(name)) {
           producerStateManager.updateParentDir(dir)
-          // re-initialize leader epoch cache so that 
LeaderEpochCheckpointFile.checkpoint can correctly reference
-          // the checkpoint file in renamed log directory
-          initializeLeaderEpochCache()
-          initializePartitionMetadata()
+          if (shouldReinitialize) {
+            // re-initialize leader epoch cache so that 
LeaderEpochCheckpointFile.checkpoint can correctly reference
+            // the checkpoint file in renamed log directory
+            initializeLeaderEpochCache()
+            initializePartitionMetadata()
+          } else {
+            leaderEpochCache = None
+            partitionMetadataFile = None
+          }
         }
       }
     }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 63e06a5bcc..042d2500fc 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1910,8 +1910,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertTrue(partition.log.isDefined)
     val log = partition.log.get
     assertEquals(expectedTopicId, log.topicId.get)
-    assertTrue(log.partitionMetadataFile.exists())
-    assertEquals(expectedTopicId, log.partitionMetadataFile.read().topicId)
+    assertTrue(log.partitionMetadataFile.get.exists())
+    assertEquals(expectedTopicId, log.partitionMetadataFile.get.read().topicId)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index e2da713809..193ce160f8 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -1853,12 +1853,12 @@ class UnifiedLogTest {
     val record = MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("simpleValue".getBytes))
 
     val topicId = Uuid.randomUuid()
-    log.partitionMetadataFile.record(topicId)
+    log.partitionMetadataFile.get.record(topicId)
 
     // Should trigger a synchronous flush
     log.appendAsLeader(record, leaderEpoch = 0)
-    assertTrue(log.partitionMetadataFile.exists())
-    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+    assertTrue(log.partitionMetadataFile.get.exists())
+    assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
   }
 
   @Test
@@ -1867,15 +1867,15 @@ class UnifiedLogTest {
     var log = createLog(logDir, logConfig)
 
     val topicId = Uuid.randomUuid()
-    log.partitionMetadataFile.record(topicId)
+    log.partitionMetadataFile.get.record(topicId)
 
     // Should trigger a synchronous flush
     log.close()
 
     // We open the log again, and the partition metadata file should exist 
with the same ID.
     log = createLog(logDir, logConfig)
-    assertTrue(log.partitionMetadataFile.exists())
-    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+    assertTrue(log.partitionMetadataFile.get.exists())
+    assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
   }
 
   @Test
@@ -1902,14 +1902,14 @@ class UnifiedLogTest {
     val topicId = Uuid.randomUuid()
     log.assignTopicId(topicId)
     // We should not write to this file or set the topic ID
-    assertFalse(log.partitionMetadataFile.exists())
+    assertFalse(log.partitionMetadataFile.get.exists())
     assertEquals(None, log.topicId)
     log.close()
 
     val log2 = createLog(logDir, logConfig, topicId = Some(Uuid.randomUuid()), 
 keepPartitionMetadataFile = false)
 
     // We should not write to this file or set the topic ID
-    assertFalse(log2.partitionMetadataFile.exists())
+    assertFalse(log2.partitionMetadataFile.get.exists())
     assertEquals(None, log2.topicId)
     log2.close()
   }
@@ -2253,7 +2253,7 @@ class UnifiedLogTest {
 
     // Ensure that after a directory rename, the epoch cache is written to the 
right location
     val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp))
+    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
     assertEquals(Some(10), log.latestEpoch)
     assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
@@ -2274,7 +2274,7 @@ class UnifiedLogTest {
 
     // Ensure that after a directory rename, the partition metadata file is 
written to the right location.
     val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp))
+    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
     assertEquals(Some(10), log.latestEpoch)
     assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
@@ -2283,7 +2283,7 @@ class UnifiedLogTest {
     // Check the topic ID remains in memory and was copied correctly.
     assertTrue(log.topicId.isDefined)
     assertEquals(topicId, log.topicId.get)
-    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+    assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
   }
 
   @Test
@@ -2293,17 +2293,17 @@ class UnifiedLogTest {
 
     // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
     val topicId = Uuid.randomUuid()
-    log.partitionMetadataFile.record(topicId)
+    log.partitionMetadataFile.get.record(topicId)
 
     // Ensure that after a directory rename, the partition metadata file is 
written to the right location.
     val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp))
+    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
     assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
     assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
 
     // Check the file holds the correct contents.
-    assertTrue(log.partitionMetadataFile.exists())
-    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+    assertTrue(log.partitionMetadataFile.get.exists())
+    assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
   }
 
   @Test
@@ -3410,6 +3410,50 @@ class UnifiedLogTest {
     assertThrows(classOf[OffsetOutOfRangeException], () => 
log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion))
   }
 
+  def testBackgroundDeletionWithIOException(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+    // Delete the underlying directory to trigger a KafkaStorageException
+    val dir = log.dir
+    Utils.delete(dir)
+    dir.createNewFile()
+
+    assertThrows(classOf[KafkaStorageException], () => {
+      log.delete()
+    })
+    assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString))
+  }
+
+  /**
+   * test renaming a log's dir without reinitialization, which is the case 
during topic deletion
+   */
+  @Test
+  def testRenamingDirWithoutReinitialization(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+    val newDir = TestUtils.randomPartitionLogDir(tmpDir)
+    assertTrue(newDir.exists())
+
+    log.renameDir(newDir.getName, false)
+    assertTrue(log.leaderEpochCache.isEmpty)
+    assertTrue(log.partitionMetadataFile.isEmpty)
+    assertEquals(0, log.logEndOffset)
+    // verify that records appending can still succeed
+    // even with the uninitialized leaderEpochCache and partitionMetadataFile
+    val records = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
+    log.appendAsLeader(records, leaderEpoch = 0)
+    assertEquals(1, log.logEndOffset)
+
+    // verify that the background deletion can succeed
+    log.delete()
+    assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
+    assertFalse(newDir.exists())
+  }
+
   private def appendTransactionalToBuffer(buffer: ByteBuffer,
                                           producerId: Long,
                                           producerEpoch: Short,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index dd005b0914..da2d2a9084 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.{AtomicLong, 
AtomicReference}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.stream.IntStream
 import java.util.{Collections, Optional, Properties}
-
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.log._
@@ -34,6 +33,7 @@ import kafka.server.checkpoints.{LazyOffsetCheckpoints, 
OffsetCheckpointFile}
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.message.LeaderAndIsrRequestData
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -2684,10 +2684,11 @@ class ReplicaManagerTest {
     assertEquals(Some(1L), readLogStartOffsetCheckpoint().get(tp0))
 
     if (throwIOException) {
-      // Delete the underlying directory to trigger an KafkaStorageException
-      val dir = partition.log.get.dir
-      Utils.delete(dir)
-      dir.createNewFile()
+      // Replace underlying PartitionMetadataFile with a mock which throws
+      // a KafkaStorageException when maybeFlush is called.
+      val mockPartitionMetadataFile = mock(classOf[PartitionMetadataFile])
+      when(mockPartitionMetadataFile.maybeFlush()).thenThrow(new 
KafkaStorageException())
+      partition.log.get.partitionMetadataFile = Some(mockPartitionMetadataFile)
     }
 
     val partitionStates = Map(tp0 -> new StopReplicaPartitionState()
@@ -2770,8 +2771,8 @@ class ReplicaManagerTest {
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val id = topicIds.get(topicPartition.topic())
       val log = replicaManager.localLog(topicPartition).get
-      assertTrue(log.partitionMetadataFile.exists())
-      val partitionMetadata = log.partitionMetadataFile.read()
+      assertTrue(log.partitionMetadataFile.get.exists())
+      val partitionMetadata = log.partitionMetadataFile.get.read()
 
       // Current version of PartitionMetadataFile is 0.
       assertEquals(0, partitionMetadata.version)
@@ -2791,7 +2792,7 @@ class ReplicaManagerTest {
       assertTrue(replicaManager.getLog(topicPartition).isDefined)
       var log = replicaManager.getLog(topicPartition).get
       assertEquals(None, log.topicId)
-      assertFalse(log.partitionMetadataFile.exists())
+      assertFalse(log.partitionMetadataFile.get.exists())
 
       val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
       val topicNames = topicIds.asScala.map(_.swap).asJava
@@ -2815,8 +2816,8 @@ class ReplicaManagerTest {
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val id = topicIds.get(topicPartition.topic())
       log = replicaManager.localLog(topicPartition).get
-      assertTrue(log.partitionMetadataFile.exists())
-      val partitionMetadata = log.partitionMetadataFile.read()
+      assertTrue(log.partitionMetadataFile.get.exists())
+      val partitionMetadata = log.partitionMetadataFile.get.read()
 
       // Current version of PartitionMetadataFile is 0.
       assertEquals(0, partitionMetadata.version)
@@ -2852,13 +2853,13 @@ class ReplicaManagerTest {
       assertEquals(Errors.NONE, 
response.partitionErrors(Collections.emptyMap()).get(topicPartition))
       assertTrue(replicaManager.localLog(topicPartition).isDefined)
       val log = replicaManager.localLog(topicPartition).get
-      assertFalse(log.partitionMetadataFile.exists())
+      assertFalse(log.partitionMetadataFile.get.exists())
       assertTrue(log.topicId.isEmpty)
 
       val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(topicIds.asJava, ApiKeys.LEADER_AND_ISR.latestVersion), (_, 
_) => ())
       assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames.asJava).get(topicPartition))
       assertTrue(replicaManager.localLog(topicPartition).isDefined)
-      assertTrue(log.partitionMetadataFile.exists())
+      assertTrue(log.partitionMetadataFile.get.exists())
       assertTrue(log.topicId.isDefined)
       assertEquals(topicId, log.topicId.get)
 
@@ -2868,18 +2869,18 @@ class ReplicaManagerTest {
       assertEquals(Errors.NONE, 
response3.partitionErrors(Collections.emptyMap()).get(topicPartition2))
       assertTrue(replicaManager.localLog(topicPartition2).isDefined)
       val log2 = replicaManager.localLog(topicPartition2).get
-      assertFalse(log2.partitionMetadataFile.exists())
+      assertFalse(log2.partitionMetadataFile.get.exists())
       assertTrue(log2.topicId.isEmpty)
 
       val response4 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(topicIds.asJava, ApiKeys.LEADER_AND_ISR.latestVersion, 1, 
1), (_, _) => ())
       assertEquals(Errors.NONE, 
response4.partitionErrors(topicNames.asJava).get(topicPartition2))
       assertTrue(replicaManager.localLog(topicPartition2).isDefined)
-      assertTrue(log2.partitionMetadataFile.exists())
+      assertTrue(log2.partitionMetadataFile.get.exists())
       assertTrue(log2.topicId.isDefined)
       assertEquals(topicId, log2.topicId.get)
 
-      assertEquals(topicId, log.partitionMetadataFile.read().topicId)
-      assertEquals(topicId, log2.partitionMetadataFile.read().topicId)
+      assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
+      assertEquals(topicId, log2.partitionMetadataFile.get.read().topicId)
     } finally replicaManager.shutdown(checkpointHW = false)
   }
 
@@ -2955,28 +2956,28 @@ class ReplicaManagerTest {
       val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, 
_) => ())
       assertTrue(replicaManager.localLog(topicPartitionFake).isDefined)
       val log = replicaManager.localLog(topicPartitionFake).get
-      assertFalse(log.partitionMetadataFile.exists())
+      assertFalse(log.partitionMetadataFile.get.exists())
       assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
 
       // There is no file if the topic has the default UUID.
       val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => 
())
       assertTrue(replicaManager.localLog(topicPartition).isDefined)
       val log2 = replicaManager.localLog(topicPartition).get
-      assertFalse(log2.partitionMetadataFile.exists())
+      assertFalse(log2.partitionMetadataFile.get.exists())
       assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames).get(topicPartition))
 
       // There is no file if the request an older version
       val response3 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
       assertTrue(replicaManager.localLog(topicPartitionFoo).isDefined)
       val log3 = replicaManager.localLog(topicPartitionFoo).get
-      assertFalse(log3.partitionMetadataFile.exists())
+      assertFalse(log3.partitionMetadataFile.get.exists())
       assertEquals(Errors.NONE, 
response3.partitionErrors(topicNames).get(topicPartitionFoo))
 
       // There is no file if the request is an older version
       val response4 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, "foo", 4), (_, _) => ())
       assertTrue(replicaManager.localLog(topicPartitionFoo).isDefined)
       val log4 = replicaManager.localLog(topicPartitionFoo).get
-      assertFalse(log4.partitionMetadataFile.exists())
+      assertFalse(log4.partitionMetadataFile.get.exists())
       assertEquals(Errors.NONE, 
response4.partitionErrors(topicNames).get(topicPartitionFoo))
     } finally replicaManager.shutdown(checkpointHW = false)
   }

Reply via email to