This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 754ede8b72b KAFKA-18432 Remove unused code from 
AutoTopicCreationManager (#18438)
754ede8b72b is described below

commit 754ede8b72b8c720c7bf306d9404f863049a9ac4
Author: Logan Zhu <[email protected]>
AuthorDate: Thu Jan 9 02:52:28 2025 +0800

    KAFKA-18432 Remove unused code from AutoTopicCreationManager (#18438)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/AutoTopicCreationManager.scala    | 85 +---------------------
 .../src/main/scala/kafka/server/BrokerServer.scala |  4 +-
 2 files changed, 5 insertions(+), 84 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 58b3035935c..e3abde0bda4 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -18,9 +18,7 @@
 package kafka.server
 
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Properties}
-import kafka.controller.KafkaController
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.Logging
 import org.apache.kafka.clients.ClientResponse
@@ -31,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicConfig, CreatableTopicConfigCollection}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, 
RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, 
RequestHeader}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
@@ -49,34 +47,13 @@ trait AutoTopicCreationManager {
   ): Seq[MetadataResponseTopic]
 }
 
-object AutoTopicCreationManager {
-
-  def apply(
-   config: KafkaConfig,
-   channelManager: Option[NodeToControllerChannelManager],
-   adminManager: Option[ZkAdminManager],
-   controller: Option[KafkaController],
-   groupCoordinator: GroupCoordinator,
-   txnCoordinator: TransactionCoordinator,
-   shareCoordinator: Option[ShareCoordinator],
- ): AutoTopicCreationManager = {
-    new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
-      controller, groupCoordinator, txnCoordinator, shareCoordinator)
-  }
-}
-
 class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
-  channelManager: Option[NodeToControllerChannelManager],
-  adminManager: Option[ZkAdminManager],
-  controller: Option[KafkaController],
+  channelManager: NodeToControllerChannelManager,
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator,
   shareCoordinator: Option[ShareCoordinator]
 ) extends AutoTopicCreationManager with Logging {
-  if (controller.isEmpty && channelManager.isEmpty) {
-    throw new IllegalArgumentException("Must supply a channel manager if not 
supplying a controller")
-  }
 
   private val inflightTopics = Collections.newSetFromMap(new 
ConcurrentHashMap[String, java.lang.Boolean]())
 
@@ -99,65 +76,13 @@ class DefaultAutoTopicCreationManager(
 
     val creatableTopicResponses = if (creatableTopics.isEmpty) {
       Seq.empty
-    } else if (controller.isEmpty || !controller.get.isActive && 
channelManager.isDefined) {
-      sendCreateTopicRequest(creatableTopics, metadataRequestContext)
     } else {
-      createTopicsInZk(creatableTopics, controllerMutationQuota)
+      sendCreateTopicRequest(creatableTopics, metadataRequestContext)
     }
 
     uncreatableTopicResponses ++ creatableTopicResponses
   }
 
-  private def createTopicsInZk(
-    creatableTopics: Map[String, CreatableTopic],
-    controllerMutationQuota: ControllerMutationQuota
-  ): Seq[MetadataResponseTopic] = {
-    val topicErrors = new AtomicReference[Map[String, ApiError]]()
-    try {
-      // Note that we use timeout = 0 since we do not need to wait for 
metadata propagation
-      // and we want to get the response error immediately.
-      adminManager.get.createTopics(
-        timeout = 0,
-        validateOnly = false,
-        creatableTopics,
-        Map.empty,
-        controllerMutationQuota,
-        topicErrors.set
-      )
-
-      val creatableTopicResponses = Option(topicErrors.get) match {
-        case Some(errors) =>
-          errors.toSeq.map { case (topic, apiError) =>
-            val error = apiError.error match {
-              case Errors.TOPIC_ALREADY_EXISTS | Errors.REQUEST_TIMED_OUT =>
-                // The timeout error is expected because we set timeout=0. This
-                // nevertheless indicates that the topic metadata was created
-                // successfully, so we return LEADER_NOT_AVAILABLE.
-                Errors.LEADER_NOT_AVAILABLE
-              case error => error
-            }
-
-            new MetadataResponseTopic()
-              .setErrorCode(error.code)
-              .setName(topic)
-              .setIsInternal(Topic.isInternal(topic))
-          }
-
-        case None =>
-          creatableTopics.keySet.toSeq.map { topic =>
-            new MetadataResponseTopic()
-              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-              .setName(topic)
-              .setIsInternal(Topic.isInternal(topic))
-          }
-      }
-
-      creatableTopicResponses
-    } finally {
-      clearInflightRequests(creatableTopics)
-    }
-  }
-
   private def sendCreateTopicRequest(
     creatableTopics: Map[String, CreatableTopic],
     metadataRequestContext: Option[RequestContext]
@@ -189,10 +114,6 @@ class DefaultAutoTopicCreationManager(
       }
     }
 
-    val channelManager = this.channelManager.getOrElse {
-      throw new IllegalStateException("Channel manager must be defined in 
order to send CreateTopic requests.")
-    }
-
     val request = metadataRequestContext.map { context =>
       val requestVersion =
         channelManager.controllerApiVersions.toScala match {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 7ab6219f159..3fd2f7789c9 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -385,8 +385,8 @@ class BrokerServer(
         producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
 
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-        config, Some(clientToControllerChannelManager), None, None,
-        groupCoordinator, transactionCoordinator, shareCoordinator)
+        config, clientToControllerChannelManager, groupCoordinator,
+        transactionCoordinator, shareCoordinator)
 
       dynamicConfigHandlers = Map[String, ConfigHandler](
         ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, 
quotaManagers, None),

Reply via email to