This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a9893cac0 KAFKA-13815: Avoid reinitialization for a replica that is
being deleted (#12029)
0a9893cac0 is described below
commit 0a9893cac0db15d6de20f394bef46158410b4bcc
Author: Lucas Wang <[email protected]>
AuthorDate: Wed May 4 11:41:34 2022 -0700
KAFKA-13815: Avoid reinitialization for a replica that is being deleted
(#12029)
This PR tries to avoid the reinitialization of the leader epoch cache
and the partition metadata if the corresponding replica is being deleted.
With this change, the asyncDelete method can run more efficiently,
which means a StopReplica request with many partitions to be deleted can be
processed more quickly.
Reviewers: David Jacot <[email protected]>, Jun Rao <[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
core/src/main/scala/kafka/log/UnifiedLog.scala | 50 ++++++++++-----
.../scala/unit/kafka/cluster/PartitionTest.scala | 4 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 74 +++++++++++++++++-----
.../unit/kafka/server/ReplicaManagerTest.scala | 41 ++++++------
5 files changed, 118 insertions(+), 57 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 4023d27346..c79faeba3b 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1009,7 +1009,7 @@ class LogManager(logDirs: Seq[File],
if (destLog == null)
throw new KafkaStorageException(s"The future replica for
$topicPartition is offline")
- destLog.renameDir(UnifiedLog.logDirName(topicPartition))
+ destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
destLog.updateHighWatermark(sourceLog.highWatermark)
// Now that future replica has been successfully renamed to be the
current replica
@@ -1022,7 +1022,7 @@ class LogManager(logDirs: Seq[File],
}
try {
- sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
+ sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
// Now that replica in source log directory has been successfully
renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be
deleted.
sourceLog.close()
@@ -1069,7 +1069,7 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.parentDirFile,
partitionToRemove = Option(topicPartition))
}
}
- removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
+ removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition),
false)
if (checkpoint) {
val logDir = removedLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e84b3238ed..99524385fb 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -287,7 +287,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
@volatile private var highWatermarkMetadata: LogOffsetMetadata =
LogOffsetMetadata(logStartOffset)
- @volatile var partitionMetadataFile : PartitionMetadataFile = null
+ @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
locally {
initializePartitionMetadata()
@@ -307,9 +307,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* - Otherwise set _topicId to None
*/
def initializeTopicId(): Unit = {
- if (partitionMetadataFile.exists()) {
+ val partMetadataFile = partitionMetadataFile.getOrElse(
+ throw new KafkaException("The partitionMetadataFile should have been
initialized"))
+
+ if (partMetadataFile.exists()) {
if (keepPartitionMetadataFile) {
- val fileTopicId = partitionMetadataFile.read().topicId
+ val fileTopicId = partMetadataFile.read().topicId
if (_topicId.isDefined && !_topicId.contains(fileTopicId))
throw new InconsistentTopicIdException(s"Tried to assign topic ID
$topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $fileTopicId")
@@ -317,14 +320,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
_topicId = Some(fileTopicId)
} else {
- try partitionMetadataFile.delete()
+ try partMetadataFile.delete()
catch {
case e: IOException =>
- error(s"Error while trying to delete partition metadata file
${partitionMetadataFile}", e)
+ error(s"Error while trying to delete partition metadata file
${partMetadataFile}", e)
}
}
} else if (keepPartitionMetadataFile) {
- _topicId.foreach(partitionMetadataFile.record)
+ _topicId.foreach(partMetadataFile.record)
scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
} else {
// We want to keep the file and the in-memory topic ID in sync.
@@ -555,11 +558,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def initializePartitionMetadata(): Unit = lock synchronized {
val partitionMetadata = PartitionMetadataFile.newFile(dir)
- partitionMetadataFile = new PartitionMetadataFile(partitionMetadata,
logDirFailureChannel)
+ partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata,
logDirFailureChannel))
}
private def maybeFlushMetadataFile(): Unit = {
- partitionMetadataFile.maybeFlush()
+ partitionMetadataFile.foreach(_.maybeFlush())
}
/** Only used for ZK clusters when we update and start using topic IDs on
existing topics */
@@ -574,9 +577,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
case None =>
if (keepPartitionMetadataFile) {
_topicId = Some(topicId)
- if (!partitionMetadataFile.exists()) {
- partitionMetadataFile.record(topicId)
- scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
+ partitionMetadataFile match {
+ case Some(partMetadataFile) =>
+ if (!partMetadataFile.exists()) {
+ partMetadataFile.record(topicId)
+ scheduler.schedule("flush-metadata-file",
maybeFlushMetadataFile)
+ }
+ case _ => warn(s"The topic id $topicId will not be persisted to
the partition metadata file " +
+ "since the partition is deleted")
}
}
}
@@ -675,21 +683,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
/**
- * Rename the directory of the local log
+ * Rename the directory of the local log. If the log's directory is being
renamed for async deletion due to a
+ * StopReplica request, then the shouldReinitialize parameter should be set
to false, otherwise it should be set to true.
*
+ * @param name The new name that this log's directory is being renamed to
+ * @param shouldReinitialize Whether the log's metadata should be
reinitialized after renaming
* @throws KafkaStorageException if rename fails
*/
- def renameDir(name: String): Unit = {
+ def renameDir(name: String, shouldReinitialize: Boolean): Unit = {
lock synchronized {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in
log dir ${dir.getParent}") {
// Flush partitionMetadata file before initializing again
maybeFlushMetadataFile()
if (localLog.renameDir(name)) {
producerStateManager.updateParentDir(dir)
- // re-initialize leader epoch cache so that
LeaderEpochCheckpointFile.checkpoint can correctly reference
- // the checkpoint file in renamed log directory
- initializeLeaderEpochCache()
- initializePartitionMetadata()
+ if (shouldReinitialize) {
+ // re-initialize leader epoch cache so that
LeaderEpochCheckpointFile.checkpoint can correctly reference
+ // the checkpoint file in renamed log directory
+ initializeLeaderEpochCache()
+ initializePartitionMetadata()
+ } else {
+ leaderEpochCache = None
+ partitionMetadataFile = None
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 63e06a5bcc..042d2500fc 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1910,8 +1910,8 @@ class PartitionTest extends AbstractPartitionTest {
assertTrue(partition.log.isDefined)
val log = partition.log.get
assertEquals(expectedTopicId, log.topicId.get)
- assertTrue(log.partitionMetadataFile.exists())
- assertEquals(expectedTopicId, log.partitionMetadataFile.read().topicId)
+ assertTrue(log.partitionMetadataFile.get.exists())
+ assertEquals(expectedTopicId, log.partitionMetadataFile.get.read().topicId)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index e2da713809..193ce160f8 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -1853,12 +1853,12 @@ class UnifiedLogTest {
val record = MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("simpleValue".getBytes))
val topicId = Uuid.randomUuid()
- log.partitionMetadataFile.record(topicId)
+ log.partitionMetadataFile.get.record(topicId)
// Should trigger a synchronous flush
log.appendAsLeader(record, leaderEpoch = 0)
- assertTrue(log.partitionMetadataFile.exists())
- assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+ assertTrue(log.partitionMetadataFile.get.exists())
+ assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
}
@Test
@@ -1867,15 +1867,15 @@ class UnifiedLogTest {
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
- log.partitionMetadataFile.record(topicId)
+ log.partitionMetadataFile.get.record(topicId)
// Should trigger a synchronous flush
log.close()
// We open the log again, and the partition metadata file should exist
with the same ID.
log = createLog(logDir, logConfig)
- assertTrue(log.partitionMetadataFile.exists())
- assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+ assertTrue(log.partitionMetadataFile.get.exists())
+ assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
}
@Test
@@ -1902,14 +1902,14 @@ class UnifiedLogTest {
val topicId = Uuid.randomUuid()
log.assignTopicId(topicId)
// We should not write to this file or set the topic ID
- assertFalse(log.partitionMetadataFile.exists())
+ assertFalse(log.partitionMetadataFile.get.exists())
assertEquals(None, log.topicId)
log.close()
val log2 = createLog(logDir, logConfig, topicId = Some(Uuid.randomUuid()),
keepPartitionMetadataFile = false)
// We should not write to this file or set the topic ID
- assertFalse(log2.partitionMetadataFile.exists())
+ assertFalse(log2.partitionMetadataFile.get.exists())
assertEquals(None, log2.topicId)
log2.close()
}
@@ -2253,7 +2253,7 @@ class UnifiedLogTest {
// Ensure that after a directory rename, the epoch cache is written to the
right location
val tp = UnifiedLog.parseTopicPartitionName(log.dir)
- log.renameDir(UnifiedLog.logDeleteDirName(tp))
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
assertEquals(Some(10), log.latestEpoch)
assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
@@ -2274,7 +2274,7 @@ class UnifiedLogTest {
// Ensure that after a directory rename, the partition metadata file is
written to the right location.
val tp = UnifiedLog.parseTopicPartitionName(log.dir)
- log.renameDir(UnifiedLog.logDeleteDirName(tp))
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
assertEquals(Some(10), log.latestEpoch)
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
@@ -2283,7 +2283,7 @@ class UnifiedLogTest {
// Check the topic ID remains in memory and was copied correctly.
assertTrue(log.topicId.isDefined)
assertEquals(topicId, log.topicId.get)
- assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+ assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
}
@Test
@@ -2293,17 +2293,17 @@ class UnifiedLogTest {
// Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
val topicId = Uuid.randomUuid()
- log.partitionMetadataFile.record(topicId)
+ log.partitionMetadataFile.get.record(topicId)
// Ensure that after a directory rename, the partition metadata file is
written to the right location.
val tp = UnifiedLog.parseTopicPartitionName(log.dir)
- log.renameDir(UnifiedLog.logDeleteDirName(tp))
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
// Check the file holds the correct contents.
- assertTrue(log.partitionMetadataFile.exists())
- assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+ assertTrue(log.partitionMetadataFile.get.exists())
+ assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
}
@Test
@@ -3410,6 +3410,50 @@ class UnifiedLogTest {
assertThrows(classOf[OffsetOutOfRangeException], () =>
log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion))
}
+ def testBackgroundDeletionWithIOException(): Unit = {
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+ val log = createLog(logDir, logConfig)
+ assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+ // Delete the underlying directory to trigger a KafkaStorageException
+ val dir = log.dir
+ Utils.delete(dir)
+ dir.createNewFile()
+
+ assertThrows(classOf[KafkaStorageException], () => {
+ log.delete()
+ })
+ assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString))
+ }
+
+ /**
+ * test renaming a log's dir without reinitialization, which is the case
during topic deletion
+ */
+ @Test
+ def testRenamingDirWithoutReinitialization(): Unit = {
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+ val log = createLog(logDir, logConfig)
+ assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+ val newDir = TestUtils.randomPartitionLogDir(tmpDir)
+ assertTrue(newDir.exists())
+
+ log.renameDir(newDir.getName, false)
+ assertTrue(log.leaderEpochCache.isEmpty)
+ assertTrue(log.partitionMetadataFile.isEmpty)
+ assertEquals(0, log.logEndOffset)
+ // verify that records appending can still succeed
+ // even with the uninitialized leaderEpochCache and partitionMetadataFile
+ val records = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
+ log.appendAsLeader(records, leaderEpoch = 0)
+ assertEquals(1, log.logEndOffset)
+
+ // verify that the background deletion can succeed
+ log.delete()
+ assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
+ assertFalse(newDir.exists())
+ }
+
private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index dd005b0914..da2d2a9084 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.{AtomicLong,
AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.stream.IntStream
import java.util.{Collections, Optional, Properties}
-
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log._
@@ -34,6 +33,7 @@ import kafka.server.checkpoints.{LazyOffsetCheckpoints,
OffsetCheckpointFile}
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.timer.MockTimer
import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.LeaderAndIsrRequestData
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -2684,10 +2684,11 @@ class ReplicaManagerTest {
assertEquals(Some(1L), readLogStartOffsetCheckpoint().get(tp0))
if (throwIOException) {
- // Delete the underlying directory to trigger an KafkaStorageException
- val dir = partition.log.get.dir
- Utils.delete(dir)
- dir.createNewFile()
+ // Replace underlying PartitionMetadataFile with a mock which throws
+ // a KafkaStorageException when maybeFlush is called.
+ val mockPartitionMetadataFile = mock(classOf[PartitionMetadataFile])
+ when(mockPartitionMetadataFile.maybeFlush()).thenThrow(new
KafkaStorageException())
+ partition.log.get.partitionMetadataFile = Some(mockPartitionMetadataFile)
}
val partitionStates = Map(tp0 -> new StopReplicaPartitionState()
@@ -2770,8 +2771,8 @@ class ReplicaManagerTest {
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
val log = replicaManager.localLog(topicPartition).get
- assertTrue(log.partitionMetadataFile.exists())
- val partitionMetadata = log.partitionMetadataFile.read()
+ assertTrue(log.partitionMetadataFile.get.exists())
+ val partitionMetadata = log.partitionMetadataFile.get.read()
// Current version of PartitionMetadataFile is 0.
assertEquals(0, partitionMetadata.version)
@@ -2791,7 +2792,7 @@ class ReplicaManagerTest {
assertTrue(replicaManager.getLog(topicPartition).isDefined)
var log = replicaManager.getLog(topicPartition).get
assertEquals(None, log.topicId)
- assertFalse(log.partitionMetadataFile.exists())
+ assertFalse(log.partitionMetadataFile.get.exists())
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val topicNames = topicIds.asScala.map(_.swap).asJava
@@ -2815,8 +2816,8 @@ class ReplicaManagerTest {
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
log = replicaManager.localLog(topicPartition).get
- assertTrue(log.partitionMetadataFile.exists())
- val partitionMetadata = log.partitionMetadataFile.read()
+ assertTrue(log.partitionMetadataFile.get.exists())
+ val partitionMetadata = log.partitionMetadataFile.get.read()
// Current version of PartitionMetadataFile is 0.
assertEquals(0, partitionMetadata.version)
@@ -2852,13 +2853,13 @@ class ReplicaManagerTest {
assertEquals(Errors.NONE,
response.partitionErrors(Collections.emptyMap()).get(topicPartition))
assertTrue(replicaManager.localLog(topicPartition).isDefined)
val log = replicaManager.localLog(topicPartition).get
- assertFalse(log.partitionMetadataFile.exists())
+ assertFalse(log.partitionMetadataFile.get.exists())
assertTrue(log.topicId.isEmpty)
val response2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(topicIds.asJava, ApiKeys.LEADER_AND_ISR.latestVersion), (_,
_) => ())
assertEquals(Errors.NONE,
response2.partitionErrors(topicNames.asJava).get(topicPartition))
assertTrue(replicaManager.localLog(topicPartition).isDefined)
- assertTrue(log.partitionMetadataFile.exists())
+ assertTrue(log.partitionMetadataFile.get.exists())
assertTrue(log.topicId.isDefined)
assertEquals(topicId, log.topicId.get)
@@ -2868,18 +2869,18 @@ class ReplicaManagerTest {
assertEquals(Errors.NONE,
response3.partitionErrors(Collections.emptyMap()).get(topicPartition2))
assertTrue(replicaManager.localLog(topicPartition2).isDefined)
val log2 = replicaManager.localLog(topicPartition2).get
- assertFalse(log2.partitionMetadataFile.exists())
+ assertFalse(log2.partitionMetadataFile.get.exists())
assertTrue(log2.topicId.isEmpty)
val response4 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(topicIds.asJava, ApiKeys.LEADER_AND_ISR.latestVersion, 1,
1), (_, _) => ())
assertEquals(Errors.NONE,
response4.partitionErrors(topicNames.asJava).get(topicPartition2))
assertTrue(replicaManager.localLog(topicPartition2).isDefined)
- assertTrue(log2.partitionMetadataFile.exists())
+ assertTrue(log2.partitionMetadataFile.get.exists())
assertTrue(log2.topicId.isDefined)
assertEquals(topicId, log2.topicId.get)
- assertEquals(topicId, log.partitionMetadataFile.read().topicId)
- assertEquals(topicId, log2.partitionMetadataFile.read().topicId)
+ assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
+ assertEquals(topicId, log2.partitionMetadataFile.get.read().topicId)
} finally replicaManager.shutdown(checkpointHW = false)
}
@@ -2955,28 +2956,28 @@ class ReplicaManagerTest {
val response = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_,
_) => ())
assertTrue(replicaManager.localLog(topicPartitionFake).isDefined)
val log = replicaManager.localLog(topicPartitionFake).get
- assertFalse(log.partitionMetadataFile.exists())
+ assertFalse(log.partitionMetadataFile.get.exists())
assertEquals(Errors.NONE,
response.partitionErrors(topicNames).get(topicPartition))
// There is no file if the topic has the default UUID.
val response2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) =>
())
assertTrue(replicaManager.localLog(topicPartition).isDefined)
val log2 = replicaManager.localLog(topicPartition).get
- assertFalse(log2.partitionMetadataFile.exists())
+ assertFalse(log2.partitionMetadataFile.get.exists())
assertEquals(Errors.NONE,
response2.partitionErrors(topicNames).get(topicPartition))
// There is no file if the request an older version
val response3 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
assertTrue(replicaManager.localLog(topicPartitionFoo).isDefined)
val log3 = replicaManager.localLog(topicPartitionFoo).get
- assertFalse(log3.partitionMetadataFile.exists())
+ assertFalse(log3.partitionMetadataFile.get.exists())
assertEquals(Errors.NONE,
response3.partitionErrors(topicNames).get(topicPartitionFoo))
// There is no file if the request is an older version
val response4 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(1, "foo", 4), (_, _) => ())
assertTrue(replicaManager.localLog(topicPartitionFoo).isDefined)
val log4 = replicaManager.localLog(topicPartitionFoo).get
- assertFalse(log4.partitionMetadataFile.exists())
+ assertFalse(log4.partitionMetadataFile.get.exists())
assertEquals(Errors.NONE,
response4.partitionErrors(topicNames).get(topicPartitionFoo))
} finally replicaManager.shutdown(checkpointHW = false)
}