This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 8b7b5160f35 KAFKA-18539 Remove optional managers in KafkaApis (#18550)
8b7b5160f35 is described below

commit 8b7b5160f35ffe246b33dd07888a10402969276f
Author: Apoorv Mittal <apoorvmitta...@gmail.com>
AuthorDate: Wed Jan 15 20:46:05 2025 +0000

    KAFKA-18539 Remove optional managers in KafkaApis (#18550)
    
    Removed Optional for SharePartitionManager and ClientMetricsManager as 
zookeeper code is being removed. Also removed asScala and asJava conversion in 
KafkaApis.handleListClientMetricsResources, moved to java stream.
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Ismael Juma 
<ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/server/builders/KafkaApisBuilder.java    | 17 +++--
 .../src/main/scala/kafka/server/BrokerServer.scala |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 87 ++++++----------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 24 ++----
 .../metadata/KRaftMetadataRequestBenchmark.java    |  7 +-
 5 files changed, 49 insertions(+), 90 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 75dc0d7dc9b..81377a14cef 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -45,8 +45,6 @@ import java.util.Optional;
 
 import scala.jdk.javaapi.OptionConverters;
 
-
-
 public class KafkaApisBuilder {
     private RequestChannel requestChannel = null;
     private MetadataSupport metadataSupport = null;
@@ -62,13 +60,13 @@ public class KafkaApisBuilder {
     private Optional<Authorizer> authorizer = Optional.empty();
     private QuotaManagers quotas = null;
     private FetchManager fetchManager = null;
-    private Optional<SharePartitionManager> sharePartitionManager = 
Optional.empty();
+    private SharePartitionManager sharePartitionManager = null;
     private BrokerTopicStats brokerTopicStats = null;
     private String clusterId = "clusterId";
     private Time time = Time.SYSTEM;
     private DelegationTokenManager tokenManager = null;
     private ApiVersionManager apiVersionManager = null;
-    private Optional<ClientMetricsManager> clientMetricsManager = 
Optional.empty();
+    private ClientMetricsManager clientMetricsManager = null;
     private Optional<ShareCoordinator> shareCoordinator = Optional.empty();
 
     public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
@@ -146,7 +144,7 @@ public class KafkaApisBuilder {
         return this;
     }
 
-    public KafkaApisBuilder 
setSharePartitionManager(Optional<SharePartitionManager> sharePartitionManager) 
{
+    public KafkaApisBuilder setSharePartitionManager(SharePartitionManager 
sharePartitionManager) {
         this.sharePartitionManager = sharePartitionManager;
         return this;
     }
@@ -176,11 +174,12 @@ public class KafkaApisBuilder {
         return this;
     }
 
-    public KafkaApisBuilder 
setClientMetricsManager(Optional<ClientMetricsManager> clientMetricsManager) {
+    public KafkaApisBuilder setClientMetricsManager(ClientMetricsManager 
clientMetricsManager) {
         this.clientMetricsManager = clientMetricsManager;
         return this;
     }
 
+    @SuppressWarnings({"CyclomaticComplexity"})
     public KafkaApis build() {
         if (requestChannel == null) throw new RuntimeException("you must set 
requestChannel");
         if (metadataSupport == null) throw new RuntimeException("you must set 
metadataSupport");
@@ -195,6 +194,8 @@ public class KafkaApisBuilder {
         if (metrics == null) throw new RuntimeException("You must set 
metrics");
         if (quotas == null) throw new RuntimeException("You must set quotas");
         if (fetchManager == null) throw new RuntimeException("You must set 
fetchManager");
+        if (sharePartitionManager == null) throw new RuntimeException("You 
must set sharePartitionManager");
+        if (clientMetricsManager == null) throw new RuntimeException("You must 
set clientMetricsManager");
         if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
         if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
@@ -213,12 +214,12 @@ public class KafkaApisBuilder {
                              OptionConverters.toScala(authorizer),
                              quotas,
                              fetchManager,
-                             OptionConverters.toScala(sharePartitionManager),
+                             sharePartitionManager,
                              brokerTopicStats,
                              clusterId,
                              time,
                              tokenManager,
                              apiVersionManager,
-                             OptionConverters.toScala(clientMetricsManager));
+                             clientMetricsManager);
     }
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 600566f821b..36f1232427e 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -464,13 +464,13 @@ class BrokerServer(
         authorizer = authorizer,
         quotas = quotaManagers,
         fetchManager = fetchManager,
-        sharePartitionManager = Some(sharePartitionManager),
+        sharePartitionManager = sharePartitionManager,
         brokerTopicStats = brokerTopicStats,
         clusterId = clusterId,
         time = time,
         tokenManager = tokenManager,
         apiVersionManager = apiVersionManager,
-        clientMetricsManager = Some(clientMetricsManager))
+        clientMetricsManager = clientMetricsManager)
 
       dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index dcbee04281c..d7701f3c614 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -97,13 +97,13 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val authorizer: Option[Authorizer],
                 val quotas: QuotaManagers,
                 val fetchManager: FetchManager,
-                val sharePartitionManager: Option[SharePartitionManager],
+                val sharePartitionManager: SharePartitionManager,
                 brokerTopicStats: BrokerTopicStats,
                 val clusterId: String,
                 time: Time,
                 val tokenManager: DelegationTokenManager,
                 val apiVersionManager: ApiVersionManager,
-                val clientMetricsManager: Option[ClientMetricsManager]
+                val clientMetricsManager: ClientMetricsManager
 ) extends ApiRequestHandler with Logging {
 
   type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
@@ -2654,35 +2654,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
     val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
-
-    clientMetricsManager match {
-      case Some(metricsManager) =>
-        try {
-          requestHelper.sendMaybeThrottle(request, 
metricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, 
request.context))
-        } catch {
-          case _: Exception =>
-            requestHelper.sendMaybeThrottle(request, 
subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
-        }
-      case None =>
-        info("Received get telemetry client request for zookeeper based 
cluster")
-        requestHelper.sendMaybeThrottle(request, 
subscriptionRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    try {
+      requestHelper.sendMaybeThrottle(request, 
clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest,
 request.context))
+    } catch {
+      case _: Exception =>
+        requestHelper.sendMaybeThrottle(request, 
subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
     }
   }
 
-  def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = {
+  private def handlePushTelemetryRequest(request: RequestChannel.Request): 
Unit = {
     val pushTelemetryRequest = request.body[PushTelemetryRequest]
-
-    clientMetricsManager match {
-      case Some(metricsManager) =>
-        try {
-          requestHelper.sendMaybeThrottle(request, 
metricsManager.processPushTelemetryRequest(pushTelemetryRequest, 
request.context))
-        } catch {
-          case _: Exception =>
-            requestHelper.sendMaybeThrottle(request, 
pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
-        }
-      case None =>
-        info("Received push telemetry client request for zookeeper based 
cluster")
-        requestHelper.sendMaybeThrottle(request, 
pushTelemetryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    try {
+      requestHelper.sendMaybeThrottle(request, 
clientMetricsManager.processPushTelemetryRequest(pushTelemetryRequest, 
request.context))
+    } catch {
+      case _: Exception =>
+        requestHelper.sendMaybeThrottle(request, 
pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
     }
   }
 
@@ -2692,18 +2678,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
       requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      clientMetricsManager match {
-        case Some(metricsManager) =>
-          val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-            metricsManager.listClientMetricsResources.asScala.map(
-              name => new ClientMetricsResource().setName(name)).toList.asJava)
-          requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
-        case None =>
-          // This should never happen as ZK based cluster calls should get 
rejected earlier itself,
-          // but we should handle it gracefully.
-          info("Received list client metrics resources request for zookeeper 
based cluster")
-          requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-      }
+      val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
+        clientMetricsManager.listClientMetricsResources.stream.map(
+          name => new ClientMetricsResource().setName(name)).toList)
+      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
     }
   }
 
@@ -2795,14 +2773,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.UNSUPPORTED_VERSION.exception))
       return
     }
-    val sharePartitionManagerInstance: SharePartitionManager = 
sharePartitionManager match {
-      case Some(manager) => manager
-      case None =>
-        // The API is not supported when the SharePartitionManager is not 
defined on the broker
-        info("Received share fetch request for zookeeper based cluster")
-        requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.UNSUPPORTED_VERSION.exception))
-        return
-    }
 
     val groupId = shareFetchRequest.data.groupId
 
@@ -2832,7 +2802,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     try {
       // Creating the shareFetchContext for Share Session Handling. if context 
creation fails, the request is failed directly here.
-      shareFetchContext = sharePartitionManagerInstance.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent)
+      shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent)
     } catch {
       case e: Exception =>
         requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
@@ -2871,7 +2841,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       acknowledgeResult = handleAcknowledgements(
         acknowledgementDataFromRequest,
         erroneous,
-        sharePartitionManagerInstance,
+        sharePartitionManager,
         authorizedTopics,
         groupId,
         memberId,
@@ -2884,7 +2854,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       handleFetchFromShareFetchRequest(
       request,
       erroneousAndValidPartitionData,
-      sharePartitionManagerInstance,
+      sharePartitionManager,
       authorizedTopics
     )
 
@@ -2951,7 +2921,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
 
           if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) {
-            sharePartitionManagerInstance.releaseSession(groupId, memberId).
+            sharePartitionManager.releaseSession(groupId, memberId).
               whenComplete((releaseAcquiredRecordsData, throwable) =>
                 if (throwable != null) {
                   error(s"Releasing share session close with correlation from 
client ${request.header.clientId}  " +
@@ -3115,15 +3085,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    val sharePartitionManagerInstance: SharePartitionManager = 
sharePartitionManager match {
-      case Some(manager) => manager
-      case None =>
-        // The API is not supported when the SharePartitionManager is not 
defined on the broker
-        info("Received share acknowledge request for zookeeper based cluster")
-        requestHelper.sendMaybeThrottle(request,
-          
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
-        return
-    }
     val groupId = shareAcknowledgeRequest.data.groupId
 
     // Share Acknowledge needs permission to perform READ action on the named 
group resource (groupId)
@@ -3139,7 +3100,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     try {
       // Updating the cache for Share Session Handling
-      sharePartitionManagerInstance.acknowledgeSessionUpdate(groupId, 
newReqMetadata)
+      sharePartitionManager.acknowledgeSessionUpdate(groupId, newReqMetadata)
     } catch {
       case e: Exception =>
         requestHelper.sendMaybeThrottle(request, 
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 e))
@@ -3168,13 +3129,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val erroneous = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
     val acknowledgementDataFromRequest = 
getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, 
topicIdNames, erroneous)
-    handleAcknowledgements(acknowledgementDataFromRequest, erroneous, 
sharePartitionManagerInstance, authorizedTopics, groupId, memberId)
+    handleAcknowledgements(acknowledgementDataFromRequest, erroneous, 
sharePartitionManager, authorizedTopics, groupId, memberId)
       .handle[Unit] {(result, exception) =>
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 exception))
         } else {
           if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) {
-            sharePartitionManagerInstance.releaseSession(groupId, memberId).
+            sharePartitionManager.releaseSession(groupId, memberId).
               whenComplete{ (releaseAcquiredRecordsData, throwable) =>
                 if (throwable != null) {
                   debug(s"Releasing share session close with correlation from 
client ${request.header.clientId}  " +
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d778c785982..b9c814acf2f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -185,8 +185,6 @@ class KafkaApisTest extends Logging {
       true,
       () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
 
-    val clientMetricsManagerOpt = Some(clientMetricsManager)
-
     
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
     setupFeatures(featureVersions)
 
@@ -206,13 +204,13 @@ class KafkaApisTest extends Logging {
       authorizer = authorizer,
       quotas = quotas,
       fetchManager = fetchManager,
-      sharePartitionManager = Some(sharePartitionManager),
+      sharePartitionManager = sharePartitionManager,
       brokerTopicStats = brokerTopicStats,
       clusterId = clusterId,
       time = time,
       tokenManager = null,
       apiVersionManager = apiVersionManager,
-      clientMetricsManager = clientMetricsManagerOpt)
+      clientMetricsManager = clientMetricsManager)
   }
 
   private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = {
@@ -9665,8 +9663,7 @@ class KafkaApisTest extends Logging {
       consumerGroupHeartbeatRequest
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      featureVersions = Seq(GroupVersion.GV_1),
-      
+      featureVersions = Seq(GroupVersion.GV_1)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9692,8 +9689,7 @@ class KafkaApisTest extends Logging {
       consumerGroupHeartbeatRequest
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      featureVersions = Seq(GroupVersion.GV_1),
-      
+      featureVersions = Seq(GroupVersion.GV_1)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9715,8 +9711,7 @@ class KafkaApisTest extends Logging {
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
-      featureVersions = Seq(GroupVersion.GV_1),
-      
+      featureVersions = Seq(GroupVersion.GV_1)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9741,8 +9736,7 @@ class KafkaApisTest extends Logging {
       any[util.List[String]]
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      featureVersions = Seq(GroupVersion.GV_1),
-      
+      featureVersions = Seq(GroupVersion.GV_1)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9812,8 +9806,7 @@ class KafkaApisTest extends Logging {
     future.complete(List().asJava)
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
-      featureVersions = Seq(GroupVersion.GV_1),
-      
+      featureVersions = Seq(GroupVersion.GV_1)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9835,8 +9828,7 @@ class KafkaApisTest extends Logging {
       any[util.List[String]]
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      featureVersions = Seq(GroupVersion.GV_1),
-      
+      featureVersions = Seq(GroupVersion.GV_1)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index a6463cfd443..5a72def807a 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -36,6 +36,7 @@ import kafka.server.SimpleApiVersionManager;
 import kafka.server.builders.KafkaApisBuilder;
 import kafka.server.metadata.KRaftMetadataCache;
 import kafka.server.metadata.MockConfigRepository;
+import kafka.server.share.SharePartitionManager;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.memory.MemoryPool;
@@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
 import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.server.ClientMetricsManager;
 import org.apache.kafka.server.common.FinalizedFeatures;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -118,6 +120,8 @@ public class KRaftMetadataRequestBenchmark {
             clientQuotaManager, clientRequestQuotaManager, 
controllerMutationQuotaManager, replicaQuotaManager,
             replicaQuotaManager, replicaQuotaManager, Optional.empty());
     private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
+    private final SharePartitionManager sharePartitionManager = 
Mockito.mock(SharePartitionManager.class);
+    private final ClientMetricsManager clientMetricsManager = 
Mockito.mock(ClientMetricsManager.class);
     private final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(false);
     private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
     @Param({"500", "1000",  "5000"})
@@ -200,7 +204,8 @@ public class KRaftMetadataRequestBenchmark {
                 setAuthorizer(Optional.empty()).
                 setQuotas(quotaManagers).
                 setFetchManager(fetchManager).
-                setSharePartitionManager(Optional.empty()).
+                setSharePartitionManager(sharePartitionManager).
+                setClientMetricsManager(clientMetricsManager).
                 setBrokerTopicStats(brokerTopicStats).
                 setClusterId("clusterId").
                 setTime(Time.SYSTEM).

Reply via email to