This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 85bd0595594 KAFKA-18545: Remove Zookeeper logic from LogManager
(#18592)
85bd0595594 is described below
commit 85bd0595594d274f05d61a6b481d72664ec08173
Author: Ken Huang <[email protected]>
AuthorDate: Tue Feb 4 01:16:35 2025 +0800
KAFKA-18545: Remove Zookeeper logic from LogManager (#18592)
Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma
<[email protected]>, Mickael Maison <[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 11 ++---
.../unit/kafka/server/ReplicaManagerTest.scala | 47 ----------------------
2 files changed, 3 insertions(+), 55 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index fee4b95d2aa..4c9c4eb475e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -355,8 +355,8 @@ class LogManager(logDirs: Seq[File],
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else if (isStray(log)) {
- // Unlike Zookeeper mode, which tracks pending topic deletions under a
ZNode, KRaft is unable to prevent a topic from being recreated before every
replica has been deleted.
- // A KRaft broker with an offline directory may be unable to detect it
still holds a to-be-deleted replica,
+ // We are unable to prevent a topic from being recreated before every
replica has been deleted.
+ // Broker with an offline directory may be unable to detect it still
holds a to-be-deleted replica,
// and can create a conflicting topic partition for a new incarnation of
the topic in one of the remaining online directories.
// So upon a restart in which the offline directory is back online we
need to clean up the old replica directory.
log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition),
shouldReinitialize = false)
@@ -952,7 +952,6 @@ class LogManager(logDirs: Seq[File],
wasRemoteLogEnabled: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
- // Combine the default properties with the overrides in zk to create the
new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals,
newTopicConfig)
val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
// We would like to validate the configuration no matter whether the logs
have materialised on disk or not.
@@ -1082,11 +1081,7 @@ class LogManager(logDirs: Seq[File],
log
}
- // When running a ZK controller, we may get a log that does not have a
topic ID. Assign it here.
- if (log.topicId.isEmpty) {
- topicId.foreach(log.assignTopicId)
- }
-
+
// Ensure topic IDs are consistent
topicId.foreach { topicId =>
log.topicId.foreach { logTopicId =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b7e166892a0..5cdfc235b5e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4184,53 +4184,6 @@ class ReplicaManagerTest {
}
}
- @Test
- def testPartitionMetadataFileCreatedWithExistingLog(): Unit = {
- val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
- try {
- val brokerList = Seq[Integer](0, 1).asJava
- val topicPartition = new TopicPartition(topic, 0)
-
- replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true,
topicId = None)
-
- assertTrue(replicaManager.getLog(topicPartition).isDefined)
- var log = replicaManager.getLog(topicPartition).get
- assertEquals(None, log.topicId)
- assertFalse(log.partitionMetadataFile.get.exists())
-
- val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
- val topicNames = topicIds.asScala.map(_.swap).asJava
-
- def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(epoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
- val response = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0), (_, _) => ())
- assertEquals(Errors.NONE,
response.partitionErrors(topicNames).get(topicPartition))
- assertFalse(replicaManager.localLog(topicPartition).isEmpty)
- val id = topicIds.get(topicPartition.topic())
- log = replicaManager.localLog(topicPartition).get
- assertTrue(log.partitionMetadataFile.get.exists())
- val partitionMetadata = log.partitionMetadataFile.get.read()
-
- // Current version of PartitionMetadataFile is 0.
- assertEquals(0, partitionMetadata.version)
- assertEquals(id, partitionMetadata.topicId)
- } finally {
- replicaManager.shutdown(checkpointHW = false)
- }
- }
-
@Test
def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))