This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 754ede8b72b KAFKA-18432 Remove unused code from
AutoTopicCreationManager (#18438)
754ede8b72b is described below
commit 754ede8b72b8c720c7bf306d9404f863049a9ac4
Author: Logan Zhu <[email protected]>
AuthorDate: Thu Jan 9 02:52:28 2025 +0800
KAFKA-18432 Remove unused code from AutoTopicCreationManager (#18438)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/AutoTopicCreationManager.scala | 85 +---------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
2 files changed, 5 insertions(+), 84 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 58b3035935c..e3abde0bda4 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -18,9 +18,7 @@
package kafka.server
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
-import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
@@ -31,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicConfig, CreatableTopicConfigCollection}
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest,
RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext,
RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
@@ -49,34 +47,13 @@ trait AutoTopicCreationManager {
): Seq[MetadataResponseTopic]
}
-object AutoTopicCreationManager {
-
- def apply(
- config: KafkaConfig,
- channelManager: Option[NodeToControllerChannelManager],
- adminManager: Option[ZkAdminManager],
- controller: Option[KafkaController],
- groupCoordinator: GroupCoordinator,
- txnCoordinator: TransactionCoordinator,
- shareCoordinator: Option[ShareCoordinator],
- ): AutoTopicCreationManager = {
- new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
- controller, groupCoordinator, txnCoordinator, shareCoordinator)
- }
-}
-
class DefaultAutoTopicCreationManager(
config: KafkaConfig,
- channelManager: Option[NodeToControllerChannelManager],
- adminManager: Option[ZkAdminManager],
- controller: Option[KafkaController],
+ channelManager: NodeToControllerChannelManager,
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
shareCoordinator: Option[ShareCoordinator]
) extends AutoTopicCreationManager with Logging {
- if (controller.isEmpty && channelManager.isEmpty) {
- throw new IllegalArgumentException("Must supply a channel manager if not
supplying a controller")
- }
private val inflightTopics = Collections.newSetFromMap(new
ConcurrentHashMap[String, java.lang.Boolean]())
@@ -99,65 +76,13 @@ class DefaultAutoTopicCreationManager(
val creatableTopicResponses = if (creatableTopics.isEmpty) {
Seq.empty
- } else if (controller.isEmpty || !controller.get.isActive &&
channelManager.isDefined) {
- sendCreateTopicRequest(creatableTopics, metadataRequestContext)
} else {
- createTopicsInZk(creatableTopics, controllerMutationQuota)
+ sendCreateTopicRequest(creatableTopics, metadataRequestContext)
}
uncreatableTopicResponses ++ creatableTopicResponses
}
- private def createTopicsInZk(
- creatableTopics: Map[String, CreatableTopic],
- controllerMutationQuota: ControllerMutationQuota
- ): Seq[MetadataResponseTopic] = {
- val topicErrors = new AtomicReference[Map[String, ApiError]]()
- try {
- // Note that we use timeout = 0 since we do not need to wait for
metadata propagation
- // and we want to get the response error immediately.
- adminManager.get.createTopics(
- timeout = 0,
- validateOnly = false,
- creatableTopics,
- Map.empty,
- controllerMutationQuota,
- topicErrors.set
- )
-
- val creatableTopicResponses = Option(topicErrors.get) match {
- case Some(errors) =>
- errors.toSeq.map { case (topic, apiError) =>
- val error = apiError.error match {
- case Errors.TOPIC_ALREADY_EXISTS | Errors.REQUEST_TIMED_OUT =>
- // The timeout error is expected because we set timeout=0. This
- // nevertheless indicates that the topic metadata was created
- // successfully, so we return LEADER_NOT_AVAILABLE.
- Errors.LEADER_NOT_AVAILABLE
- case error => error
- }
-
- new MetadataResponseTopic()
- .setErrorCode(error.code)
- .setName(topic)
- .setIsInternal(Topic.isInternal(topic))
- }
-
- case None =>
- creatableTopics.keySet.toSeq.map { topic =>
- new MetadataResponseTopic()
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- .setName(topic)
- .setIsInternal(Topic.isInternal(topic))
- }
- }
-
- creatableTopicResponses
- } finally {
- clearInflightRequests(creatableTopics)
- }
- }
-
private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic],
metadataRequestContext: Option[RequestContext]
@@ -189,10 +114,6 @@ class DefaultAutoTopicCreationManager(
}
}
- val channelManager = this.channelManager.getOrElse {
- throw new IllegalStateException("Channel manager must be defined in
order to send CreateTopic requests.")
- }
-
val request = metadataRequestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 7ab6219f159..3fd2f7789c9 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -385,8 +385,8 @@ class BrokerServer(
producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config, Some(clientToControllerChannelManager), None, None,
- groupCoordinator, transactionCoordinator, shareCoordinator)
+ config, clientToControllerChannelManager, groupCoordinator,
+ transactionCoordinator, shareCoordinator)
dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config,
quotaManagers, None),