This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3fa998475b2 KAFKA-18539 Remove optional managers in KafkaApis (#18550) 3fa998475b2 is described below commit 3fa998475b263fd475730ef8e4ee04cac14d762f Author: Apoorv Mittal <apoorvmitta...@gmail.com> AuthorDate: Wed Jan 15 20:46:05 2025 +0000 KAFKA-18539 Remove optional managers in KafkaApis (#18550) Removed Optional for SharePartitionManager and ClientMetricsManager as zookeeper code is being removed. Also removed asScala and asJava conversion in KafkaApis.handleListClientMetricsResources, moved to java stream. Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/server/builders/KafkaApisBuilder.java | 17 +++-- .../src/main/scala/kafka/server/BrokerServer.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 87 ++++++---------------- .../scala/unit/kafka/server/KafkaApisTest.scala | 24 ++---- .../metadata/KRaftMetadataRequestBenchmark.java | 7 +- 5 files changed, 49 insertions(+), 90 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 75dc0d7dc9b..81377a14cef 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -45,8 +45,6 @@ import java.util.Optional; import scala.jdk.javaapi.OptionConverters; - - public class KafkaApisBuilder { private RequestChannel requestChannel = null; private MetadataSupport metadataSupport = null; @@ -62,13 +60,13 @@ public class KafkaApisBuilder { private Optional<Authorizer> authorizer = Optional.empty(); private QuotaManagers quotas = null; private FetchManager fetchManager = null; - private Optional<SharePartitionManager> sharePartitionManager = Optional.empty(); + private SharePartitionManager sharePartitionManager = null; private BrokerTopicStats brokerTopicStats = null; private String clusterId = "clusterId"; private Time time = Time.SYSTEM; private DelegationTokenManager tokenManager = null; private ApiVersionManager apiVersionManager = null; - private Optional<ClientMetricsManager> clientMetricsManager = Optional.empty(); + private ClientMetricsManager clientMetricsManager = null; private Optional<ShareCoordinator> shareCoordinator = Optional.empty(); public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) { @@ -146,7 +144,7 @@ public class KafkaApisBuilder { return this; } - public KafkaApisBuilder setSharePartitionManager(Optional<SharePartitionManager> sharePartitionManager) { + public KafkaApisBuilder setSharePartitionManager(SharePartitionManager sharePartitionManager) { this.sharePartitionManager = sharePartitionManager; return this; } @@ -176,11 +174,12 @@ public class KafkaApisBuilder { return this; } - public KafkaApisBuilder setClientMetricsManager(Optional<ClientMetricsManager> clientMetricsManager) { + public KafkaApisBuilder setClientMetricsManager(ClientMetricsManager clientMetricsManager) { this.clientMetricsManager = clientMetricsManager; return this; } + @SuppressWarnings({"CyclomaticComplexity"}) public KafkaApis build() { if (requestChannel == null) throw new RuntimeException("you must set requestChannel"); if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport"); @@ -195,6 +194,8 @@ public class KafkaApisBuilder { if (metrics == null) throw new RuntimeException("You must set metrics"); if (quotas == null) throw new RuntimeException("You must set quotas"); if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); + if (sharePartitionManager == null) throw new RuntimeException("You must set sharePartitionManager"); + if (clientMetricsManager == null) throw new RuntimeException("You must set clientMetricsManager"); if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); @@ -213,12 +214,12 @@ public class KafkaApisBuilder { OptionConverters.toScala(authorizer), quotas, fetchManager, - OptionConverters.toScala(sharePartitionManager), + sharePartitionManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager, - OptionConverters.toScala(clientMetricsManager)); + clientMetricsManager); } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 600566f821b..36f1232427e 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -464,13 +464,13 @@ class BrokerServer( authorizer = authorizer, quotas = quotaManagers, fetchManager = fetchManager, - sharePartitionManager = Some(sharePartitionManager), + sharePartitionManager = sharePartitionManager, brokerTopicStats = brokerTopicStats, clusterId = clusterId, time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager, - clientMetricsManager = Some(clientMetricsManager)) + clientMetricsManager = clientMetricsManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f7ee031ff6b..33c625ca1c0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -97,13 +97,13 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizer: Option[Authorizer], val quotas: QuotaManagers, val fetchManager: FetchManager, - val sharePartitionManager: Option[SharePartitionManager], + val sharePartitionManager: SharePartitionManager, brokerTopicStats: BrokerTopicStats, val clusterId: String, time: Time, val tokenManager: DelegationTokenManager, val apiVersionManager: ApiVersionManager, - val clientMetricsManager: Option[ClientMetricsManager] + val clientMetricsManager: ClientMetricsManager ) extends ApiRequestHandler with Logging { type FetchResponseStats = Map[TopicPartition, RecordValidationStats] @@ -2655,35 +2655,21 @@ class KafkaApis(val requestChannel: RequestChannel, def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] - - clientMetricsManager match { - case Some(metricsManager) => - try { - requestHelper.sendMaybeThrottle(request, metricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, request.context)) - } catch { - case _: Exception => - requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception)) - } - case None => - info("Received get telemetry client request for zookeeper based cluster") - requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + try { + requestHelper.sendMaybeThrottle(request, clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, request.context)) + } catch { + case _: Exception => + requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception)) } } - def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = { + private def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = { val pushTelemetryRequest = request.body[PushTelemetryRequest] - - clientMetricsManager match { - case Some(metricsManager) => - try { - requestHelper.sendMaybeThrottle(request, metricsManager.processPushTelemetryRequest(pushTelemetryRequest, request.context)) - } catch { - case _: Exception => - requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception)) - } - case None => - info("Received push telemetry client request for zookeeper based cluster") - requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + try { + requestHelper.sendMaybeThrottle(request, clientMetricsManager.processPushTelemetryRequest(pushTelemetryRequest, request.context)) + } catch { + case _: Exception => + requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception)) } } @@ -2693,18 +2679,10 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - clientMetricsManager match { - case Some(metricsManager) => - val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( - metricsManager.listClientMetricsResources.asScala.map( - name => new ClientMetricsResource().setName(name)).toList.asJava) - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) - case None => - // This should never happen as ZK based cluster calls should get rejected earlier itself, - // but we should handle it gracefully. - info("Received list client metrics resources request for zookeeper based cluster") - requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - } + val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + clientMetricsManager.listClientMetricsResources.stream.map( + name => new ClientMetricsResource().setName(name)).toList) + requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) } } @@ -2796,14 +2774,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) return } - val sharePartitionManagerInstance: SharePartitionManager = sharePartitionManager match { - case Some(manager) => manager - case None => - // The API is not supported when the SharePartitionManager is not defined on the broker - info("Received share fetch request for zookeeper based cluster") - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) - return - } val groupId = shareFetchRequest.data.groupId @@ -2833,7 +2803,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. - shareFetchContext = sharePartitionManagerInstance.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent) + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent) } catch { case e: Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) @@ -2872,7 +2842,7 @@ class KafkaApis(val requestChannel: RequestChannel, acknowledgeResult = handleAcknowledgements( acknowledgementDataFromRequest, erroneous, - sharePartitionManagerInstance, + sharePartitionManager, authorizedTopics, groupId, memberId, @@ -2885,7 +2855,7 @@ class KafkaApis(val requestChannel: RequestChannel, handleFetchFromShareFetchRequest( request, erroneousAndValidPartitionData, - sharePartitionManagerInstance, + sharePartitionManager, authorizedTopics ) @@ -2952,7 +2922,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) { - sharePartitionManagerInstance.releaseSession(groupId, memberId). + sharePartitionManager.releaseSession(groupId, memberId). whenComplete((releaseAcquiredRecordsData, throwable) => if (throwable != null) { error(s"Releasing share session close with correlation from client ${request.header.clientId} " + @@ -3117,15 +3087,6 @@ class KafkaApis(val requestChannel: RequestChannel, return } - val sharePartitionManagerInstance: SharePartitionManager = sharePartitionManager match { - case Some(manager) => manager - case None => - // The API is not supported when the SharePartitionManager is not defined on the broker - info("Received share acknowledge request for zookeeper based cluster") - requestHelper.sendMaybeThrottle(request, - shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) - return - } val groupId = shareAcknowledgeRequest.data.groupId // Share Acknowledge needs permission to perform READ action on the named group resource (groupId) @@ -3141,7 +3102,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { // Updating the cache for Share Session Handling - sharePartitionManagerInstance.acknowledgeSessionUpdate(groupId, newReqMetadata) + sharePartitionManager.acknowledgeSessionUpdate(groupId, newReqMetadata) } catch { case e: Exception => requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) @@ -3170,13 +3131,13 @@ class KafkaApis(val requestChannel: RequestChannel, val erroneous = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicIdNames, erroneous) - handleAcknowledgements(acknowledgementDataFromRequest, erroneous, sharePartitionManagerInstance, authorizedTopics, groupId, memberId) + handleAcknowledgements(acknowledgementDataFromRequest, erroneous, sharePartitionManager, authorizedTopics, groupId, memberId) .handle[Unit] {(result, exception) => if (exception != null) { requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception)) } else { if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) { - sharePartitionManagerInstance.releaseSession(groupId, memberId). + sharePartitionManager.releaseSession(groupId, memberId). whenComplete{ (releaseAcquiredRecordsData, throwable) => if (throwable != null) { debug(s"Releasing share session close with correlation from client ${request.header.clientId} " + diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 348b6a8d646..a09728a1777 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -185,8 +185,6 @@ class KafkaApisTest extends Logging { true, () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) - val clientMetricsManagerOpt = Some(clientMetricsManager) - when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled) setupFeatures(featureVersions) @@ -206,13 +204,13 @@ class KafkaApisTest extends Logging { authorizer = authorizer, quotas = quotas, fetchManager = fetchManager, - sharePartitionManager = Some(sharePartitionManager), + sharePartitionManager = sharePartitionManager, brokerTopicStats = brokerTopicStats, clusterId = clusterId, time = time, tokenManager = null, apiVersionManager = apiVersionManager, - clientMetricsManager = clientMetricsManagerOpt) + clientMetricsManager = clientMetricsManager) } private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = { @@ -9665,8 +9663,7 @@ class KafkaApisTest extends Logging { consumerGroupHeartbeatRequest )).thenReturn(future) kafkaApis = createKafkaApis( - featureVersions = Seq(GroupVersion.GV_1), - + featureVersions = Seq(GroupVersion.GV_1) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9692,8 +9689,7 @@ class KafkaApisTest extends Logging { consumerGroupHeartbeatRequest )).thenReturn(future) kafkaApis = createKafkaApis( - featureVersions = Seq(GroupVersion.GV_1), - + featureVersions = Seq(GroupVersion.GV_1) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9715,8 +9711,7 @@ class KafkaApisTest extends Logging { .thenReturn(Seq(AuthorizationResult.DENIED).asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - featureVersions = Seq(GroupVersion.GV_1), - + featureVersions = Seq(GroupVersion.GV_1) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9741,8 +9736,7 @@ class KafkaApisTest extends Logging { any[util.List[String]] )).thenReturn(future) kafkaApis = createKafkaApis( - featureVersions = Seq(GroupVersion.GV_1), - + featureVersions = Seq(GroupVersion.GV_1) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9812,8 +9806,7 @@ class KafkaApisTest extends Logging { future.complete(List().asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - featureVersions = Seq(GroupVersion.GV_1), - + featureVersions = Seq(GroupVersion.GV_1) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9835,8 +9828,7 @@ class KafkaApisTest extends Logging { any[util.List[String]] )).thenReturn(future) kafkaApis = createKafkaApis( - featureVersions = Seq(GroupVersion.GV_1), - + featureVersions = Seq(GroupVersion.GV_1) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index a6463cfd443..5a72def807a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -36,6 +36,7 @@ import kafka.server.SimpleApiVersionManager; import kafka.server.builders.KafkaApisBuilder; import kafka.server.metadata.KRaftMetadataCache; import kafka.server.metadata.MockConfigRepository; +import kafka.server.share.SharePartitionManager; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.memory.MemoryPool; @@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.network.RequestConvertToJson; import org.apache.kafka.network.metrics.RequestChannelMetrics; import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.ClientMetricsManager; import org.apache.kafka.server.common.FinalizedFeatures; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; @@ -118,6 +120,8 @@ public class KRaftMetadataRequestBenchmark { clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, Optional.empty()); private final FetchManager fetchManager = Mockito.mock(FetchManager.class); + private final SharePartitionManager sharePartitionManager = Mockito.mock(SharePartitionManager.class); + private final ClientMetricsManager clientMetricsManager = Mockito.mock(ClientMetricsManager.class); private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); @Param({"500", "1000", "5000"}) @@ -200,7 +204,8 @@ public class KRaftMetadataRequestBenchmark { setAuthorizer(Optional.empty()). setQuotas(quotaManagers). setFetchManager(fetchManager). - setSharePartitionManager(Optional.empty()). + setSharePartitionManager(sharePartitionManager). + setClientMetricsManager(clientMetricsManager). setBrokerTopicStats(brokerTopicStats). setClusterId("clusterId"). setTime(Time.SYSTEM).