This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 77018f0c9f13d15eb5da3a4497de28e32f08f580
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Thu Jan 16 11:38:33 2025 +0800

    KAFKA-18472: Remove MetadataSupport (#18483)
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../kafka/server/builders/KafkaApisBuilder.java    |  12 +--
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  57 +++--------
 .../main/scala/kafka/server/MetadataSupport.scala  | 106 ---------------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   8 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   3 +-
 6 files changed, 21 insertions(+), 169 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 81377a14cef..b4764f8d284 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -23,10 +23,10 @@ import kafka.server.ApiVersionManager;
 import kafka.server.AutoTopicCreationManager;
 import kafka.server.DelegationTokenManager;
 import kafka.server.FetchManager;
+import kafka.server.ForwardingManager;
 import kafka.server.KafkaApis;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
-import kafka.server.MetadataSupport;
 import kafka.server.QuotaFactory.QuotaManagers;
 import kafka.server.ReplicaManager;
 import kafka.server.metadata.ConfigRepository;
@@ -47,7 +47,7 @@ import scala.jdk.javaapi.OptionConverters;
 
 public class KafkaApisBuilder {
     private RequestChannel requestChannel = null;
-    private MetadataSupport metadataSupport = null;
+    private ForwardingManager forwardingManager = null;
     private ReplicaManager replicaManager = null;
     private GroupCoordinator groupCoordinator = null;
     private TransactionCoordinator txnCoordinator = null;
@@ -74,8 +74,8 @@ public class KafkaApisBuilder {
         return this;
     }
 
-    public KafkaApisBuilder setMetadataSupport(MetadataSupport 
metadataSupport) {
-        this.metadataSupport = metadataSupport;
+    public KafkaApisBuilder setForwardingManager(ForwardingManager 
forwardingManager) {
+        this.forwardingManager = forwardingManager;
         return this;
     }
 
@@ -182,7 +182,7 @@ public class KafkaApisBuilder {
     @SuppressWarnings({"CyclomaticComplexity"})
     public KafkaApis build() {
         if (requestChannel == null) throw new RuntimeException("you must set 
requestChannel");
-        if (metadataSupport == null) throw new RuntimeException("you must set 
metadataSupport");
+        if (forwardingManager == null) throw new RuntimeException("you must 
set forwardingManager");
         if (replicaManager == null) throw new RuntimeException("You must set 
replicaManager");
         if (groupCoordinator == null) throw new RuntimeException("You must set 
groupCoordinator");
         if (txnCoordinator == null) throw new RuntimeException("You must set 
txnCoordinator");
@@ -200,7 +200,7 @@ public class KafkaApisBuilder {
         if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
         return new KafkaApis(requestChannel,
-                             metadataSupport,
+                             forwardingManager,
                              replicaManager,
                              groupCoordinator,
                              txnCoordinator,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 36f1232427e..40de268a25b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -446,11 +446,9 @@ class BrokerServer(
         metrics
       )
 
-      // Create the request processor objects.
-      val raftSupport = RaftSupport(forwardingManager, metadataCache)
       dataPlaneRequestProcessor = new KafkaApis(
         requestChannel = socketServer.dataPlaneRequestChannel,
-        metadataSupport = raftSupport,
+        forwardingManager = forwardingManager,
         replicaManager = replicaManager,
         groupCoordinator = groupCoordinator,
         txnCoordinator = transactionCoordinator,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d7701f3c614..804c6dbc471 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -83,7 +83,7 @@ import scala.jdk.CollectionConverters._
  * Logic to handle the various Kafka requests
  */
 class KafkaApis(val requestChannel: RequestChannel,
-                val metadataSupport: MetadataSupport,
+                val forwardingManager: ForwardingManager,
                 val replicaManager: ReplicaManager,
                 val groupCoordinator: GroupCoordinator,
                 val txnCoordinator: TransactionCoordinator,
@@ -132,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    metadataSupport.forward(request, responseCallback)
+    forwardingManager.forwardRequest(request, responseCallback)
   }
 
   private def handleInvalidVersionsDuringForwarding(request: 
RequestChannel.Request): Unit = {
@@ -2107,7 +2107,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (remaining.resources().isEmpty) {
       sendResponse(Some(new AlterConfigsResponseData()))
     } else {
-      metadataSupport.forwardingManager.get.forwardRequest(request,
+      forwardingManager.forwardRequest(request,
         new AlterConfigsRequest(remaining, request.header.apiVersion()),
         response => sendResponse(response.map(_.data())))
     }
@@ -2134,7 +2134,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (remaining.resources().isEmpty) {
       sendResponse(Some(new IncrementalAlterConfigsResponseData()))
     } else {
-      metadataSupport.forwardingManager.get.forwardRequest(request,
+      forwardingManager.forwardRequest(request,
         new IncrementalAlterConfigsRequest(remaining, 
request.header.apiVersion()),
         response => sendResponse(response.map(_.data())))
     }
@@ -2367,39 +2367,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
         describeClientQuotasRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      metadataSupport match {
-        case ZkSupport(adminManager, controller, zkClient, forwardingManager, 
metadataCache, _) =>
-          val result = 
adminManager.describeClientQuotas(describeClientQuotasRequest.filter)
-
-          val entriesData = result.iterator.map { case (quotaEntity, 
quotaValues) =>
-            val entityData = quotaEntity.entries.asScala.iterator.map { case 
(entityType, entityName) =>
-              new DescribeClientQuotasResponseData.EntityData()
-                .setEntityType(entityType)
-                .setEntityName(entityName)
-            }.toBuffer
-
-            val valueData = quotaValues.iterator.map { case (key, value) =>
-              new DescribeClientQuotasResponseData.ValueData()
-                .setKey(key)
-                .setValue(value)
-            }.toBuffer
-
-            new DescribeClientQuotasResponseData.EntryData()
-              .setEntity(entityData.asJava)
-              .setValues(valueData.asJava)
-          }.toBuffer
-
-          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-            new DescribeClientQuotasResponse(new 
DescribeClientQuotasResponseData()
-              .setThrottleTimeMs(requestThrottleMs)
-              .setEntries(entriesData.asJava)))
-        case RaftSupport(_, metadataCache) =>
-          val result = 
metadataCache.describeClientQuotas(describeClientQuotasRequest.data())
-          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs 
=> {
-            result.setThrottleTimeMs(requestThrottleMs)
-            new DescribeClientQuotasResponse(result)
-          })
-      }
+      val result = 
metadataCache.asInstanceOf[KRaftMetadataCache].describeClientQuotas(describeClientQuotasRequest.data())
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        result.setThrottleTimeMs(requestThrottleMs)
+        new DescribeClientQuotasResponse(result)
+      })
     }
   }
 
@@ -2410,14 +2382,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
         
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      metadataSupport match {
-        case RaftSupport(_, metadataCache) =>
-          val result = 
metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
-          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-            new 
DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
-        case _ =>
-         throw KafkaApis.shouldNeverReceive(request)
-      }
+      val result = 
metadataCache.asInstanceOf[KRaftMetadataCache].describeScramCredentials(describeUserScramCredentialsRequest.data())
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new 
DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala 
b/core/src/main/scala/kafka/server/MetadataSupport.scala
deleted file mode 100644
index 83a52e83f69..00000000000
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.controller.KafkaController
-import kafka.network.RequestChannel
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.requests.AbstractResponse
-
-sealed trait MetadataSupport {
-  /**
-   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
-   * despite being optional when using ZooKeeper and required when using Raft
-   */
-  val forwardingManager: Option[ForwardingManager]
-
-  /**
-   * Return this instance downcast for use with ZooKeeper
-   *
-   * @param createException function to create an exception to throw
-   * @return this instance downcast for use with ZooKeeper
-   * @throws Exception if this instance is not for ZooKeeper
-   */
-  def requireZkOrThrow(createException: => Exception): ZkSupport
-
-  /**
-   * Return this instance downcast for use with Raft
-   *
-   * @param createException function to create an exception to throw
-   * @return this instance downcast for use with Raft
-   * @throws Exception if this instance is not for Raft
-   */
-  def requireRaftOrThrow(createException: => Exception): RaftSupport
-
-  /**
-   * Confirm that this instance is consistent with the given config
-   *
-   * @param config the config to check for consistency with this instance
-   * @throws IllegalStateException if there is an inconsistency (Raft for a 
ZooKeeper config or vice-versa)
-   */
-  def ensureConsistentWith(config: KafkaConfig): Unit
-
-  def canForward(): Boolean
-
-  def forward(
-    request: RequestChannel.Request,
-    responseCallback: Option[AbstractResponse] => Unit
-  ): Unit = {
-    forwardingManager.get.forwardRequest(request, responseCallback)
-  }
-}
-
-case class ZkSupport(adminManager: ZkAdminManager,
-                     controller: KafkaController,
-                     zkClient: KafkaZkClient,
-                     forwardingManager: Option[ForwardingManager],
-                     metadataCache: ZkMetadataCache,
-                     brokerEpochManager: ZkBrokerEpochManager) extends 
MetadataSupport {
-  override def requireZkOrThrow(createException: => Exception): ZkSupport = 
this
-
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= throw createException
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-    if (!config.requiresZookeeper) {
-      throw new IllegalStateException("Config specifies Raft but metadata 
support instance is for ZooKeeper")
-    }
-  }
-
-  override def canForward(): Boolean = forwardingManager.isDefined && 
(!controller.isActive)
-
-  def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: 
Boolean): Boolean = {
-    brokerEpochManager.isBrokerEpochStale(brokerEpochInRequest, 
isKRaftControllerRequest)
-  }
-}
-
-case class RaftSupport(fwdMgr: ForwardingManager,
-                       metadataCache: KRaftMetadataCache)
-    extends MetadataSupport {
-  override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
-  override def requireZkOrThrow(createException: => Exception): ZkSupport = 
throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
-    }
-  }
-
-  override def canForward(): Boolean = true
-}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b9c814acf2f..c679ef16c7e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -169,12 +169,6 @@ class KafkaApisTest extends Logging {
     TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
     val config = new KafkaConfig(properties)
 
-    val metadataSupport = metadataCache match {
-        case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache)
-        case _ => throw new IllegalStateException("Test must set an instance 
of KRaftMetadataCache")
-      }
-
-
     val listenerType = ListenerType.BROKER
     val enabledApis = ApiKeys.apisForListener(listenerType).asScala
 
@@ -190,7 +184,7 @@ class KafkaApisTest extends Logging {
 
     new KafkaApis(
       requestChannel = requestChannel,
-      metadataSupport = metadataSupport,
+      forwardingManager = forwardingManager,
       replicaManager = replicaManager,
       groupCoordinator = groupCoordinator,
       txnCoordinator = txnCoordinator,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 5a72def807a..a46a4d94bbe 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -29,7 +29,6 @@ import kafka.server.KafkaApis;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory;
-import kafka.server.RaftSupport;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicationQuotaManager;
 import kafka.server.SimpleApiVersionManager;
@@ -191,7 +190,7 @@ public class KRaftMetadataRequestBenchmark {
         KafkaConfig config = new KafkaConfig(kafkaProps);
         return new KafkaApisBuilder().
                 setRequestChannel(requestChannel).
-                setMetadataSupport(new RaftSupport(forwardingManager, 
metadataCache)).
+                setForwardingManager(forwardingManager).
                 setReplicaManager(replicaManager).
                 setGroupCoordinator(groupCoordinator).
                 setTxnCoordinator(transactionCoordinator).

Reply via email to