This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 85bd0595594 KAFKA-18545: Remove Zookeeper logic from LogManager 
(#18592)
85bd0595594 is described below

commit 85bd0595594d274f05d61a6b481d72664ec08173
Author: Ken Huang <[email protected]>
AuthorDate: Tue Feb 4 01:16:35 2025 +0800

    KAFKA-18545: Remove Zookeeper logic from LogManager (#18592)
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma 
<[email protected]>, Mickael Maison <[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 11 ++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 47 ----------------------
 2 files changed, 3 insertions(+), 55 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index fee4b95d2aa..4c9c4eb475e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -355,8 +355,8 @@ class LogManager(logDirs: Seq[File],
       addStrayLog(topicPartition, log)
       warn(s"Loaded stray log: $logDir")
     } else if (isStray(log)) {
-      // Unlike Zookeeper mode, which tracks pending topic deletions under a 
ZNode, KRaft is unable to prevent a topic from being recreated before every 
replica has been deleted.
-      // A KRaft broker with an offline directory may be unable to detect it 
still holds a to-be-deleted replica,
+      // We are unable to prevent a topic from being recreated before every 
replica has been deleted.
+      // Broker with an offline directory may be unable to detect it still 
holds a to-be-deleted replica,
       // and can create a conflicting topic partition for a new incarnation of 
the topic in one of the remaining online directories.
       // So upon a restart in which the offline directory is back online we 
need to clean up the old replica directory.
       log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), 
shouldReinitialize = false)
@@ -952,7 +952,6 @@ class LogManager(logDirs: Seq[File],
                         wasRemoteLogEnabled: Boolean): Unit = {
     topicConfigUpdated(topic)
     val logs = logsByTopic(topic)
-    // Combine the default properties with the overrides in zk to create the 
new LogConfig
     val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, 
newTopicConfig)
     val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
     // We would like to validate the configuration no matter whether the logs 
have materialised on disk or not.
@@ -1082,11 +1081,7 @@ class LogManager(logDirs: Seq[File],
 
         log
       }
-      // When running a ZK controller, we may get a log that does not have a 
topic ID. Assign it here.
-      if (log.topicId.isEmpty) {
-        topicId.foreach(log.assignTopicId)
-      }
-
+      
       // Ensure topic IDs are consistent
       topicId.foreach { topicId =>
         log.topicId.foreach { logTopicId =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b7e166892a0..5cdfc235b5e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4184,53 +4184,6 @@ class ReplicaManagerTest {
     }
   }
 
-  @Test
-  def testPartitionMetadataFileCreatedWithExistingLog(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
-    try {
-      val brokerList = Seq[Integer](0, 1).asJava
-      val topicPartition = new TopicPartition(topic, 0)
-
-      replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true, 
topicId = None)
-
-      assertTrue(replicaManager.getLog(topicPartition).isDefined)
-      var log = replicaManager.getLog(topicPartition).get
-      assertEquals(None, log.topicId)
-      assertFalse(log.partitionMetadataFile.get.exists())
-
-      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val topicNames = topicIds.asScala.map(_.swap).asJava
-
-      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(epoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
-      val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0), (_, _) => ())
-      assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
-      assertFalse(replicaManager.localLog(topicPartition).isEmpty)
-      val id = topicIds.get(topicPartition.topic())
-      log = replicaManager.localLog(topicPartition).get
-      assertTrue(log.partitionMetadataFile.get.exists())
-      val partitionMetadata = log.partitionMetadataFile.get.read()
-
-      // Current version of PartitionMetadataFile is 0.
-      assertEquals(0, partitionMetadata.version)
-      assertEquals(id, partitionMetadata.topicId)
-    } finally {
-      replicaManager.shutdown(checkpointHW = false)
-    }
-  }
-
   @Test
   def testInconsistentIdReturnsError(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))

Reply via email to