This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 77018f0c9f13d15eb5da3a4497de28e32f08f580 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Thu Jan 16 11:38:33 2025 +0800 KAFKA-18472: Remove MetadataSupport (#18483) Reviewers: Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/server/builders/KafkaApisBuilder.java | 12 +-- .../src/main/scala/kafka/server/BrokerServer.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 57 +++-------- .../main/scala/kafka/server/MetadataSupport.scala | 106 --------------------- .../scala/unit/kafka/server/KafkaApisTest.scala | 8 +- .../metadata/KRaftMetadataRequestBenchmark.java | 3 +- 6 files changed, 21 insertions(+), 169 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 81377a14cef..b4764f8d284 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -23,10 +23,10 @@ import kafka.server.ApiVersionManager; import kafka.server.AutoTopicCreationManager; import kafka.server.DelegationTokenManager; import kafka.server.FetchManager; +import kafka.server.ForwardingManager; import kafka.server.KafkaApis; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; -import kafka.server.MetadataSupport; import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; import kafka.server.metadata.ConfigRepository; @@ -47,7 +47,7 @@ import scala.jdk.javaapi.OptionConverters; public class KafkaApisBuilder { private RequestChannel requestChannel = null; - private MetadataSupport metadataSupport = null; + private ForwardingManager forwardingManager = null; private ReplicaManager replicaManager = null; private GroupCoordinator groupCoordinator = null; private TransactionCoordinator txnCoordinator = null; @@ -74,8 +74,8 @@ public class KafkaApisBuilder { return this; } - public KafkaApisBuilder setMetadataSupport(MetadataSupport metadataSupport) { - this.metadataSupport = metadataSupport; + public KafkaApisBuilder setForwardingManager(ForwardingManager forwardingManager) { + this.forwardingManager = forwardingManager; return this; } @@ -182,7 +182,7 @@ public class KafkaApisBuilder { @SuppressWarnings({"CyclomaticComplexity"}) public KafkaApis build() { if (requestChannel == null) throw new RuntimeException("you must set requestChannel"); - if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport"); + if (forwardingManager == null) throw new RuntimeException("you must set forwardingManager"); if (replicaManager == null) throw new RuntimeException("You must set replicaManager"); if (groupCoordinator == null) throw new RuntimeException("You must set groupCoordinator"); if (txnCoordinator == null) throw new RuntimeException("You must set txnCoordinator"); @@ -200,7 +200,7 @@ public class KafkaApisBuilder { if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); return new KafkaApis(requestChannel, - metadataSupport, + forwardingManager, replicaManager, groupCoordinator, txnCoordinator, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 36f1232427e..40de268a25b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -446,11 +446,9 @@ class BrokerServer( metrics ) - // Create the request processor objects. - val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis( requestChannel = socketServer.dataPlaneRequestChannel, - metadataSupport = raftSupport, + forwardingManager = forwardingManager, replicaManager = replicaManager, groupCoordinator = groupCoordinator, txnCoordinator = transactionCoordinator, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d7701f3c614..804c6dbc471 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -83,7 +83,7 @@ import scala.jdk.CollectionConverters._ * Logic to handle the various Kafka requests */ class KafkaApis(val requestChannel: RequestChannel, - val metadataSupport: MetadataSupport, + val forwardingManager: ForwardingManager, val replicaManager: ReplicaManager, val groupCoordinator: GroupCoordinator, val txnCoordinator: TransactionCoordinator, @@ -132,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - metadataSupport.forward(request, responseCallback) + forwardingManager.forwardRequest(request, responseCallback) } private def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit = { @@ -2107,7 +2107,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (remaining.resources().isEmpty) { sendResponse(Some(new AlterConfigsResponseData())) } else { - metadataSupport.forwardingManager.get.forwardRequest(request, + forwardingManager.forwardRequest(request, new AlterConfigsRequest(remaining, request.header.apiVersion()), response => sendResponse(response.map(_.data()))) } @@ -2134,7 +2134,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (remaining.resources().isEmpty) { sendResponse(Some(new IncrementalAlterConfigsResponseData())) } else { - metadataSupport.forwardingManager.get.forwardRequest(request, + forwardingManager.forwardRequest(request, new IncrementalAlterConfigsRequest(remaining, request.header.apiVersion()), response => sendResponse(response.map(_.data()))) } @@ -2367,39 +2367,11 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - metadataSupport match { - case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) => - val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter) - - val entriesData = result.iterator.map { case (quotaEntity, quotaValues) => - val entityData = quotaEntity.entries.asScala.iterator.map { case (entityType, entityName) => - new DescribeClientQuotasResponseData.EntityData() - .setEntityType(entityType) - .setEntityName(entityName) - }.toBuffer - - val valueData = quotaValues.iterator.map { case (key, value) => - new DescribeClientQuotasResponseData.ValueData() - .setKey(key) - .setValue(value) - }.toBuffer - - new DescribeClientQuotasResponseData.EntryData() - .setEntity(entityData.asJava) - .setValues(valueData.asJava) - }.toBuffer - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new DescribeClientQuotasResponse(new DescribeClientQuotasResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setEntries(entriesData.asJava))) - case RaftSupport(_, metadataCache) => - val result = metadataCache.describeClientQuotas(describeClientQuotasRequest.data()) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - result.setThrottleTimeMs(requestThrottleMs) - new DescribeClientQuotasResponse(result) - }) - } + val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeClientQuotas(describeClientQuotasRequest.data()) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + result.setThrottleTimeMs(requestThrottleMs) + new DescribeClientQuotasResponse(result) + }) } } @@ -2410,14 +2382,9 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - metadataSupport match { - case RaftSupport(_, metadataCache) => - val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data()) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs))) - case _ => - throw KafkaApis.shouldNeverReceive(request) - } + val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeScramCredentials(describeUserScramCredentialsRequest.data()) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs))) } } diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala deleted file mode 100644 index 83a52e83f69..00000000000 --- a/core/src/main/scala/kafka/server/MetadataSupport.scala +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.controller.KafkaController -import kafka.network.RequestChannel -import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} -import kafka.zk.KafkaZkClient -import org.apache.kafka.common.requests.AbstractResponse - -sealed trait MetadataSupport { - /** - * Provide a uniform way of getting to the ForwardingManager, which is a shared concept - * despite being optional when using ZooKeeper and required when using Raft - */ - val forwardingManager: Option[ForwardingManager] - - /** - * Return this instance downcast for use with ZooKeeper - * - * @param createException function to create an exception to throw - * @return this instance downcast for use with ZooKeeper - * @throws Exception if this instance is not for ZooKeeper - */ - def requireZkOrThrow(createException: => Exception): ZkSupport - - /** - * Return this instance downcast for use with Raft - * - * @param createException function to create an exception to throw - * @return this instance downcast for use with Raft - * @throws Exception if this instance is not for Raft - */ - def requireRaftOrThrow(createException: => Exception): RaftSupport - - /** - * Confirm that this instance is consistent with the given config - * - * @param config the config to check for consistency with this instance - * @throws IllegalStateException if there is an inconsistency (Raft for a ZooKeeper config or vice-versa) - */ - def ensureConsistentWith(config: KafkaConfig): Unit - - def canForward(): Boolean - - def forward( - request: RequestChannel.Request, - responseCallback: Option[AbstractResponse] => Unit - ): Unit = { - forwardingManager.get.forwardRequest(request, responseCallback) - } -} - -case class ZkSupport(adminManager: ZkAdminManager, - controller: KafkaController, - zkClient: KafkaZkClient, - forwardingManager: Option[ForwardingManager], - metadataCache: ZkMetadataCache, - brokerEpochManager: ZkBrokerEpochManager) extends MetadataSupport { - override def requireZkOrThrow(createException: => Exception): ZkSupport = this - - override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException - - override def ensureConsistentWith(config: KafkaConfig): Unit = { - if (!config.requiresZookeeper) { - throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper") - } - } - - override def canForward(): Boolean = forwardingManager.isDefined && (!controller.isActive) - - def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: Boolean): Boolean = { - brokerEpochManager.isBrokerEpochStale(brokerEpochInRequest, isKRaftControllerRequest) - } -} - -case class RaftSupport(fwdMgr: ForwardingManager, - metadataCache: KRaftMetadataCache) - extends MetadataSupport { - override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr) - override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException - override def requireRaftOrThrow(createException: => Exception): RaftSupport = this - - override def ensureConsistentWith(config: KafkaConfig): Unit = { - if (config.requiresZookeeper) { - throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") - } - } - - override def canForward(): Boolean = true -} diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index b9c814acf2f..c679ef16c7e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -169,12 +169,6 @@ class KafkaApisTest extends Logging { TestUtils.setIbpVersion(properties, interBrokerProtocolVersion) val config = new KafkaConfig(properties) - val metadataSupport = metadataCache match { - case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache) - case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache") - } - - val listenerType = ListenerType.BROKER val enabledApis = ApiKeys.apisForListener(listenerType).asScala @@ -190,7 +184,7 @@ class KafkaApisTest extends Logging { new KafkaApis( requestChannel = requestChannel, - metadataSupport = metadataSupport, + forwardingManager = forwardingManager, replicaManager = replicaManager, groupCoordinator = groupCoordinator, txnCoordinator = txnCoordinator, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index 5a72def807a..a46a4d94bbe 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -29,7 +29,6 @@ import kafka.server.KafkaApis; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; import kafka.server.QuotaFactory; -import kafka.server.RaftSupport; import kafka.server.ReplicaManager; import kafka.server.ReplicationQuotaManager; import kafka.server.SimpleApiVersionManager; @@ -191,7 +190,7 @@ public class KRaftMetadataRequestBenchmark { KafkaConfig config = new KafkaConfig(kafkaProps); return new KafkaApisBuilder(). setRequestChannel(requestChannel). - setMetadataSupport(new RaftSupport(forwardingManager, metadataCache)). + setForwardingManager(forwardingManager). setReplicaManager(replicaManager). setGroupCoordinator(groupCoordinator). setTxnCoordinator(transactionCoordinator).