This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6659777  MINOR: Don't ignore deletion of partition metadata file and 
log topic id clean-ups (#10761)
6659777 is described below

commit 6659777b820f32483d1e96e26831cb87f80a8576
Author: Ismael Juma <[email protected]>
AuthorDate: Thu May 27 13:50:51 2021 -0700

    MINOR: Don't ignore deletion of partition metadata file and log topic id 
clean-ups (#10761)
    
    Log if deletion fails and don't expose log topic id for mutability outside 
of `assignTopicId()`.
    
    Also remove an unnecessary parameter in `PartitionTest`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan 
<[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala                | 18 ++++++++++++------
 .../scala/kafka/server/PartitionMetadataFile.scala     |  6 ++++--
 .../scala/unit/kafka/cluster/PartitionLockTest.scala   |  2 +-
 .../test/scala/unit/kafka/cluster/PartitionTest.scala  | 11 +++++------
 .../scala/unit/kafka/log/LogCleanerManagerTest.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogCleanerTest.scala |  2 +-
 core/src/test/scala/unit/kafka/log/LogLoaderTest.scala | 10 +++++-----
 core/src/test/scala/unit/kafka/log/LogTest.scala       |  9 ++++-----
 .../scala/unit/kafka/server/ReplicaManagerTest.scala   |  3 +--
 .../test/scala/unit/kafka/utils/SchedulerTest.scala    |  2 +-
 10 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index fe60c42..0e74c28 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -270,7 +270,7 @@ class Log(@volatile private var _dir: File,
           @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
-          @volatile var topicId: Option[Uuid],
+          @volatile private var _topicId: Option[Uuid],
           val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
 
   import kafka.log.Log._
@@ -324,18 +324,24 @@ class Log(@volatile private var _dir: File,
     if (partitionMetadataFile.exists()) {
         if (keepPartitionMetadataFile) {
           val fileTopicId = partitionMetadataFile.read().topicId
-          if (topicId.isDefined && !topicId.contains(fileTopicId))
+          if (_topicId.isDefined && !_topicId.contains(fileTopicId))
             throw new InconsistentTopicIdException(s"Tried to assign topic ID 
$topicId to log for topic partition $topicPartition," +
               s"but log already contained topic ID $fileTopicId")
-          topicId = Some(fileTopicId)
+          _topicId = Some(fileTopicId)
         } else {
-          partitionMetadataFile.delete()
+          try partitionMetadataFile.delete()
+          catch {
+            case e: IOException =>
+              error(s"Error while trying to delete partition metadata file 
${partitionMetadataFile}", e)
+          }
         }
     } else if (keepPartitionMetadataFile) {
-      topicId.foreach(partitionMetadataFile.write)
+      _topicId.foreach(partitionMetadataFile.write)
     }
   }
 
+  def topicId: Option[Uuid] = _topicId
+
   def dir: File = _dir
 
   def parentDir: String = _parentDir
@@ -551,7 +557,7 @@ class Log(@volatile private var _dir: File,
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
     partitionMetadataFile.write(topicId)
-    this.topicId = Some(topicId)
+    _topicId = Some(topicId)
   }
 
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala 
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 25b1ba6..c08c87d 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -138,7 +138,9 @@ class PartitionMetadataFile(val file: File,
     file.exists()
   }
 
-  def delete(): Boolean = {
-    file.delete()
+  def delete(): Unit = {
+    Files.delete(file.toPath)
   }
+
+  override def toString: String = s"PartitionMetadataFile(path=$path)"
 }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 0cbeec9..8d567ec 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -385,7 +385,7 @@ class PartitionLockTest extends Logging {
     leaderEpochCache,
     producerStateManager,
     logDirFailureChannel,
-    topicId = None,
+    _topicId = None,
     keepPartitionMetadataFile = true) {
 
     override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, 
origin: AppendOrigin, interBrokerProtocolVersion: ApiVersion): LogAppendInfo = {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7030715..b614e13 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -72,7 +72,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(17L, log.logEndOffset)
 
     val leaderEpoch = 10
-    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true, log = log)
+    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true)
 
     def epochEndOffset(epoch: Int, endOffset: Long): 
FetchResponseData.EpochEndOffset = {
       new FetchResponseData.EpochEndOffset()
@@ -143,7 +143,7 @@ class PartitionTest extends AbstractPartitionTest {
     ), leaderEpoch = 5)
     assertEquals(4, log.logEndOffset)
 
-    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true, log = log)
+    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true)
     assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
 
     val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch 
= Optional.of[Integer](leaderEpoch),
@@ -171,7 +171,7 @@ class PartitionTest extends AbstractPartitionTest {
     ), leaderEpoch = 5)
     assertEquals(4, log.logEndOffset)
 
-    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true, log = log)
+    val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, 
isLeader = true)
     assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
     assertEquals(None, log.latestEpoch)
 
@@ -733,8 +733,7 @@ class PartitionTest extends AbstractPartitionTest {
   }
 
   private def setupPartitionWithMocks(leaderEpoch: Int,
-                                      isLeader: Boolean,
-                                      log: Log = 
logManager.getOrCreateLog(topicPartition, topicId = None)): Partition = {
+                                      isLeader: Boolean): Partition = {
     partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
 
     val controllerEpoch = 0
@@ -2046,7 +2045,7 @@ class PartitionTest extends AbstractPartitionTest {
     leaderEpochCache,
     producerStateManager,
     logDirFailureChannel,
-    topicId = None,
+    _topicId = None,
     keepPartitionMetadataFile = true) {
 
     override def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 7620348..19de8ea 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -121,7 +121,7 @@ class LogCleanerManagerTest extends Logging {
       extends Log(dir, config, segments, offsets.logStartOffset, 
offsets.recoveryPoint,
         offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time,
         LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, 
leaderEpochCache,
-        producerStateManager, logDirFailureChannel, topicId = None, 
keepPartitionMetadataFile = true) {
+        producerStateManager, logDirFailureChannel, _topicId = None, 
keepPartitionMetadataFile = true) {
       // Throw an error in getFirstBatchTimestampForSegments since it is 
called in grabFilthiestLog()
       override def getFirstBatchTimestampForSegments(segments: 
Iterable[LogSegment]): Iterable[Long] =
         throw new IllegalStateException("Error!")
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0890b03..5c91041 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -136,7 +136,7 @@ class LogCleanerTest {
                       leaderEpochCache = leaderEpochCache,
                       producerStateManager = producerStateManager,
                       logDirFailureChannel = logDirFailureChannel,
-                      topicId = None,
+                      _topicId = None,
                       keepPartitionMetadataFile = true) {
       override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
         deleteStartLatch.countDown()
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 4826784..9a7b627 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -284,7 +284,7 @@ class LogLoaderTest {
       new Log(logDir, logConfig, interceptedLogSegments, 
offsets.logStartOffset, offsets.recoveryPoint,
         offsets.nextOffsetMetadata, mockTime.scheduler, brokerTopicStats, 
mockTime,
         LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, 
leaderEpochCache,
-        producerStateManager, logDirFailureChannel, topicId = None, 
keepPartitionMetadataFile = true)
+        producerStateManager, logDirFailureChannel, _topicId = None, 
keepPartitionMetadataFile = true)
     }
 
     // Retain snapshots for the last 2 segments
@@ -366,7 +366,7 @@ class LogLoaderTest {
       leaderEpochCache = leaderEpochCache,
       producerStateManager = stateManager,
       logDirFailureChannel = logDirFailureChannel,
-      topicId = None,
+      _topicId = None,
       keepPartitionMetadataFile = true)
 
     EasyMock.verify(stateManager)
@@ -500,7 +500,7 @@ class LogLoaderTest {
       leaderEpochCache = leaderEpochCache,
       producerStateManager = stateManager,
       logDirFailureChannel = logDirFailureChannel,
-      topicId = None,
+      _topicId = None,
       keepPartitionMetadataFile = true)
 
     EasyMock.verify(stateManager)
@@ -561,7 +561,7 @@ class LogLoaderTest {
       leaderEpochCache = leaderEpochCache,
       producerStateManager = stateManager,
       logDirFailureChannel = logDirFailureChannel,
-      topicId = None,
+      _topicId = None,
       keepPartitionMetadataFile = true)
 
     EasyMock.verify(stateManager)
@@ -624,7 +624,7 @@ class LogLoaderTest {
       leaderEpochCache = leaderEpochCache,
       producerStateManager = stateManager,
       logDirFailureChannel = logDirFailureChannel,
-      topicId = None,
+      _topicId = None,
       keepPartitionMetadataFile = true)
 
     EasyMock.verify(stateManager)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3054116..4cc88d0 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2319,9 +2319,8 @@ class LogTest {
     val log = createLog(logDir, logConfig)
 
     // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
-    val id = Uuid.randomUuid()
-    log.topicId = Some(id)
-    log.partitionMetadataFile.write(id)
+    val topicId = Uuid.randomUuid()
+    log.assignTopicId(topicId)
 
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
     assertEquals(Some(5), log.latestEpoch)
@@ -2336,8 +2335,8 @@ class LogTest {
 
     // Check the topic ID remains in memory and was copied correctly.
     assertTrue(log.topicId.isDefined)
-    assertEquals(id, log.topicId.get)
-    assertEquals(id, log.partitionMetadataFile.read().topicId)
+    assertEquals(topicId, log.topicId.get)
+    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 28f7be8..99928f0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1075,7 +1075,6 @@ class ReplicaManagerTest {
   private def initializeLogAndTopicId(replicaManager: ReplicaManager, 
topicPartition: TopicPartition, topicId: Uuid): Unit = {
     val partition = replicaManager.createPartition(new TopicPartition(topic, 
0))
     val log = replicaManager.logManager.getOrCreateLog(topicPartition, false, 
false, Some(topicId))
-    log.topicId = Some(topicId)
     partition.log = Some(log)
   }
 
@@ -1517,7 +1516,7 @@ class ReplicaManagerTest {
       leaderEpochCache = leaderEpochCache,
       producerStateManager = producerStateManager,
       logDirFailureChannel = mockLogDirFailureChannel,
-      topicId = topicId,
+      _topicId = topicId,
       keepPartitionMetadataFile = true) {
 
       override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] 
= {
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index e5d2f21..b085ab0 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -143,7 +143,7 @@ class SchedulerTest {
       recoveryPoint = offsets.recoveryPoint, nextOffsetMetadata = 
offsets.nextOffsetMetadata, scheduler,
       brokerTopicStats, mockTime, 
LogManager.ProducerIdExpirationCheckIntervalMs,
       topicPartition, leaderEpochCache, producerStateManager, 
logDirFailureChannel,
-      topicId = None, keepPartitionMetadataFile = true)
+      _topicId = None, keepPartitionMetadataFile = true)
     assertTrue(scheduler.taskRunning(log.producerExpireCheck))
     log.close()
     assertFalse(scheduler.taskRunning(log.producerExpireCheck))

Reply via email to