This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 927008e5628 KAFKA-18399 Remove ZooKeeper from KafkaApis (3/N): 
USER_SCRAM_CREDENTIALS (#18456)
927008e5628 is described below

commit 927008e56288467cb25b545110c1ec7ad56392f3
Author: TengYao Chi <kiting...@gmail.com>
AuthorDate: Sun Jan 12 20:39:49 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (3/N): USER_SCRAM_CREDENTIALS 
(#18456)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  79 ++++-------
 .../main/scala/kafka/server/MetadataSupport.scala  |   9 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 153 +--------------------
 3 files changed, 26 insertions(+), 215 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d80a3803337..88e032e7cc8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -135,17 +135,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
-  private def maybeForwardToController(
-    request: RequestChannel.Request,
-    handler: RequestChannel.Request => Unit
-  ): Unit = {
+  private def forwardToController(request: RequestChannel.Request): Unit = {
     def responseCallback(responseOpt: Option[AbstractResponse]): Unit = {
       responseOpt match {
         case Some(response) => requestHelper.sendForwardedResponse(request, 
response)
         case None => handleInvalidVersionsDuringForwarding(request)
       }
     }
-    metadataSupport.maybeForward(request, handler, responseCallback)
+
+    metadataSupport.forward(request, responseCallback)
   }
 
   private def handleInvalidVersionsDuringForwarding(request: 
RequestChannel.Request): Unit = {
@@ -155,16 +153,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.closeConnection(request, Collections.emptyMap())
   }
 
-  private def forwardToControllerOrFail(
-    request: RequestChannel.Request
-  ): Unit = {
-    def errorHandler(request: RequestChannel.Request): Unit = {
-      throw new IllegalStateException(s"Unable to forward $request to the 
controller")
-    }
-
-    maybeForwardToController(request, errorHandler)
-  }
-
   /**
    * Top-level method that handles all requests and multiplexes to the right 
api
    */
@@ -201,8 +189,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_GROUPS => 
handleListGroupsRequest(request).exceptionally(handleError)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, 
handleCreateTopicsRequest)
-        case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, 
handleDeleteTopicsRequest)
+        case ApiKeys.CREATE_TOPICS => forwardToController(request)
+        case ApiKeys.DELETE_TOPICS => forwardToController(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
         case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, 
requestLocal)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => 
handleOffsetForLeaderEpochRequest(request)
@@ -212,14 +200,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.WRITE_TXN_MARKERS => 
handleWriteTxnMarkersRequest(request, requestLocal)
         case ApiKeys.TXN_OFFSET_COMMIT => 
handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
         case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
-        case ApiKeys.CREATE_ACLS => maybeForwardToController(request, 
handleCreateAcls)
-        case ApiKeys.DELETE_ACLS => maybeForwardToController(request, 
handleDeleteAcls)
+        case ApiKeys.CREATE_ACLS => forwardToController(request)
+        case ApiKeys.DELETE_ACLS => forwardToController(request)
         case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
         case ApiKeys.ALTER_REPLICA_LOG_DIRS => 
handleAlterReplicaLogDirsRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
-        case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, 
handleCreatePartitionsRequest)
+        case ApiKeys.CREATE_PARTITIONS => forwardToController(request)
         // Create, renew and expire DelegationTokens must first validate that 
the connection
         // itself is not authenticated with a delegation token before 
maybeForwardToController.
         case ApiKeys.CREATE_DELEGATION_TOKEN => 
handleCreateTokenRequest(request)
@@ -227,32 +215,32 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
handleExpireTokenRequest(request)
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => 
handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, 
requestLocal).exceptionally(handleError)
-        case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, 
handleElectLeaders)
+        case ApiKeys.ELECT_LEADERS => forwardToController(request)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => 
handleIncrementalAlterConfigsRequest(request)
-        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => 
maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
-        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => 
maybeForwardToController(request, handleListPartitionReassignmentsRequest)
+        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => 
forwardToController(request)
+        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => 
forwardToController(request)
         case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.DESCRIBE_CLIENT_QUOTAS => 
handleDescribeClientQuotasRequest(request)
-        case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, 
handleAlterClientQuotasRequest)
+        case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request)
         case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => 
handleDescribeUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
+        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
forwardToController(request)
         case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
-        case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, 
handleUpdateFeatures)
+        case ApiKeys.UPDATE_FEATURES => forwardToController(request)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
-        case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
+        case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
         case ApiKeys.DESCRIBE_TRANSACTIONS => 
handleDescribeTransactionsRequest(request)
         case ApiKeys.LIST_TRANSACTIONS => 
handleListTransactionsRequest(request)
         case ApiKeys.ALLOCATE_PRODUCER_IDS => 
handleAllocateProducerIdsRequest(request)
-        case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
+        case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
         case ApiKeys.CONSUMER_GROUP_HEARTBEAT => 
handleConsumerGroupHeartbeat(request).exceptionally(handleError)
         case ApiKeys.CONSUMER_GROUP_DESCRIBE => 
handleConsumerGroupDescribe(request).exceptionally(handleError)
         case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => 
handleDescribeTopicPartitionsRequest(request)
         case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => 
handleGetTelemetrySubscriptionsRequest(request)
         case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
         case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => 
handleListClientMetricsResources(request)
-        case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request)
-        case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request)
+        case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
+        case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
         case ApiKeys.SHARE_GROUP_HEARTBEAT => 
handleShareGroupHeartbeat(request).exceptionally(handleError)
         case ApiKeys.SHARE_GROUP_DESCRIBE => 
handleShareGroupDescribe(request).exceptionally(handleError)
         case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request)
@@ -2771,7 +2759,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
           Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
     } else {
-      maybeForwardToController(request, handleCreateTokenRequestZk)
+      forwardToController(request)
     }
   }
 
@@ -2824,7 +2812,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
               .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
     } else {
-      maybeForwardToController(request, handleRenewTokenRequestZk)
+      forwardToController(request)
     }
   }
 
@@ -2870,7 +2858,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
               .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
     } else {
-      maybeForwardToController(request, handleExpireTokenRequestZk)
+      forwardToController(request)
     }
   }
 
@@ -3173,37 +3161,16 @@ class KafkaApis(val requestChannel: RequestChannel,
         
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
       metadataSupport match {
-        case ZkSupport(adminManager, controller, zkClient, forwardingManager, 
metadataCache, _) =>
-          val result = adminManager.describeUserScramCredentials(
-            
Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
-          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-            new 
DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
         case RaftSupport(_, metadataCache) =>
           val result = 
metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
             new 
DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
+        case _ =>
+         throw KafkaApis.shouldNeverReceive(request)
       }
     }
   }
 
-  def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): 
Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val alterUserScramCredentialsRequest = 
request.body[AlterUserScramCredentialsRequest]
-
-    if (!zkSupport.controller.isActive) {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception))
-    } else if (authHelper.authorize(request.context, ALTER, CLUSTER, 
CLUSTER_NAME)) {
-      val result = zkSupport.adminManager.alterUserScramCredentials(
-        alterUserScramCredentialsRequest.data.upsertions().asScala, 
alterUserScramCredentialsRequest.data.deletions().asScala)
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new 
AlterUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
-    } else {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
-    }
-  }
-
   def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
     val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
     val alterPartitionRequest = request.body[AlterPartitionRequest]
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala 
b/core/src/main/scala/kafka/server/MetadataSupport.scala
index 335df7c42d7..83a52e83f69 100644
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ b/core/src/main/scala/kafka/server/MetadataSupport.scala
@@ -58,16 +58,11 @@ sealed trait MetadataSupport {
 
   def canForward(): Boolean
 
-  def maybeForward(
+  def forward(
     request: RequestChannel.Request,
-    handler: RequestChannel.Request => Unit,
     responseCallback: Option[AbstractResponse] => Unit
   ): Unit = {
-    if (!request.isForwarded && canForward()) {
-      forwardingManager.get.forwardRequest(request, responseCallback)
-    } else {
-      handler(request)
-    }
+    forwardingManager.get.forwardRequest(request, responseCallback)
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e65d4011fb0..0abeb3192ed 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -43,8 +43,7 @@ import 
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsReso
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
 => LAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
-import 
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
-import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicCollection}
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
 => IAlterConfigsResource, AlterConfigsResourceCollection => 
IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, 
AlterableConfigCollection => IAlterableConfigCollection}
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
 => IAlterConfigsResourceResponse}
@@ -598,14 +597,6 @@ class KafkaApisTest extends Logging {
     )
   }
 
-  private def testForwardableApi(apiKey: ApiKeys, requestBuilder: 
AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
-    kafkaApis = createKafkaApis(enableForwarding = true)
-    testForwardableApi(kafkaApis = kafkaApis,
-      apiKey,
-      requestBuilder
-    )
-  }
-
   private def testForwardableApi(
     kafkaApis: KafkaApis,
     apiKey: ApiKeys,
@@ -769,12 +760,6 @@ class KafkaApisTest extends Logging {
     verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), 
anyLong)
   }
 
-  @Test
-  def testAlterClientQuotasWithForwarding(): Unit = {
-    val requestBuilder = new 
AlterClientQuotasRequest.Builder(List.empty.asJava, false)
-    testForwardableApi(ApiKeys.ALTER_CLIENT_QUOTAS, requestBuilder)
-  }
-
   private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
                                            expected: Map[ClientQuotaEntity, 
Errors]): Unit = {
     val futures = expected.keys.map(quotaEntity => quotaEntity -> new 
KafkaFutureImpl[Void]()).toMap
@@ -858,16 +843,6 @@ class KafkaApisTest extends Logging {
         Topic.CLUSTER_METADATA_TOPIC_NAME -> Errors.NONE))
   }
 
-  @Test
-  def testCreateTopicsWithForwarding(): Unit = {
-    val requestBuilder = new CreateTopicsRequest.Builder(
-      new CreateTopicsRequestData().setTopics(
-        new CreatableTopicCollection(Collections.singleton(
-          new CreatableTopic().setName("topic").setNumPartitions(1).
-            setReplicationFactor(1.toShort)).iterator())))
-    testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder)
-  }
-
   @ParameterizedTest
   @CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
   def testKRaftControllerThrottleTimeEnforced(
@@ -919,65 +894,6 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedThrottleTimeMs, responseData.throttleTimeMs)
   }
 
-  @Test
-  def testCreatePartitionsAuthorization(): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-
-    val timeoutMs = 35000
-    val requestData = new CreatePartitionsRequestData()
-      .setTimeoutMs(timeoutMs)
-      .setValidateOnly(false)
-    val fooCreatePartitionsData = new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2)
-    val barCreatePartitionsData = new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10)
-    requestData.topics().add(fooCreatePartitionsData)
-    requestData.topics().add(barCreatePartitionsData)
-
-    val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", 
PatternType.LITERAL)
-    val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
-
-    val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", 
PatternType.LITERAL)
-    val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
-
-    when(authorizer.authorize(
-      any[RequestContext](),
-      any[util.List[Action]]()
-    )).thenAnswer { invocation =>
-      val actions = invocation.getArgument[util.List[Action]](1).asScala
-      val results = actions.map { action =>
-        if (action == fooAction) AuthorizationResult.ALLOWED
-        else if (action == barAction) AuthorizationResult.DENIED
-        else throw new AssertionError(s"Unexpected action $action")
-      }
-      new util.ArrayList[AuthorizationResult](results.asJava)
-    }
-
-    val request = buildRequest(new 
CreatePartitionsRequest.Builder(requestData).build())
-
-    when(controller.isActive).thenReturn(true)
-    when(controller.isTopicQueuedForDeletion("foo")).thenReturn(false)
-    when(clientControllerQuotaManager.newQuotaFor(
-      ArgumentMatchers.eq(request), ArgumentMatchers.anyShort())
-    ).thenReturn(UnboundedControllerMutationQuota)
-    when(adminManager.createPartitions(
-      timeoutMs = ArgumentMatchers.eq(timeoutMs),
-      newPartitions = ArgumentMatchers.eq(Seq(fooCreatePartitionsData)),
-      validateOnly = ArgumentMatchers.eq(false),
-      controllerMutationQuota = 
ArgumentMatchers.eq(UnboundedControllerMutationQuota),
-      callback = ArgumentMatchers.any[Map[String, ApiError] => Unit]()
-    )).thenAnswer { invocation =>
-      val callback = invocation.getArgument[Map[String, ApiError] => Unit](4)
-      callback.apply(Map("foo" -> ApiError.NONE))
-    }
-
-    kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
-    val response = verifyNoThrottling[CreatePartitionsResponse](request)
-    val results = response.data.results.asScala
-    assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result 
=> Errors.forCode(result.errorCode)))
-    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name 
== "bar").map(result => Errors.forCode(result.errorCode)))
-  }
-
   private def createTopicAuthorization(authorizer: Authorizer,
                                        operation: AclOperation,
                                        authorizedTopic: String,
@@ -1033,66 +949,6 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedTopicConfigErrorCodes, actualTopicConfigErrorCodes)
   }
 
-  @Test
-  def testCreateAclWithForwarding(): Unit = {
-    val requestBuilder = new CreateAclsRequest.Builder(new 
CreateAclsRequestData())
-    testForwardableApi(ApiKeys.CREATE_ACLS, requestBuilder)
-  }
-
-  @Test
-  def testDeleteAclWithForwarding(): Unit = {
-    val requestBuilder = new DeleteAclsRequest.Builder(new 
DeleteAclsRequestData())
-    testForwardableApi(ApiKeys.DELETE_ACLS, requestBuilder)
-  }
-
-  @Test
-  def testCreateDelegationTokenWithForwarding(): Unit = {
-    val requestBuilder = new CreateDelegationTokenRequest.Builder(new 
CreateDelegationTokenRequestData())
-    testForwardableApi(ApiKeys.CREATE_DELEGATION_TOKEN, requestBuilder)
-  }
-
-  @Test
-  def testRenewDelegationTokenWithForwarding(): Unit = {
-    val requestBuilder = new RenewDelegationTokenRequest.Builder(new 
RenewDelegationTokenRequestData())
-    testForwardableApi(ApiKeys.RENEW_DELEGATION_TOKEN, requestBuilder)
-  }
-
-  @Test
-  def testExpireDelegationTokenWithForwarding(): Unit = {
-    val requestBuilder = new ExpireDelegationTokenRequest.Builder(new 
ExpireDelegationTokenRequestData())
-    testForwardableApi(ApiKeys.EXPIRE_DELEGATION_TOKEN, requestBuilder)
-  }
-
-  @Test
-  def testAlterPartitionReassignmentsWithForwarding(): Unit = {
-    val requestBuilder = new AlterPartitionReassignmentsRequest.Builder(new 
AlterPartitionReassignmentsRequestData())
-    testForwardableApi(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, requestBuilder)
-  }
-
-  @Test
-  def testCreatePartitionsWithForwarding(): Unit = {
-    val requestBuilder = new CreatePartitionsRequest.Builder(new 
CreatePartitionsRequestData())
-    testForwardableApi(ApiKeys.CREATE_PARTITIONS, requestBuilder)
-  }
-
-  @Test
-  def testUpdateFeaturesWithForwarding(): Unit = {
-    val requestBuilder = new UpdateFeaturesRequest.Builder(new 
UpdateFeaturesRequestData())
-    testForwardableApi(ApiKeys.UPDATE_FEATURES, requestBuilder)
-  }
-
-  @Test
-  def testDeleteTopicsWithForwarding(): Unit = {
-    val requestBuilder = new DeleteTopicsRequest.Builder(new 
DeleteTopicsRequestData())
-    testForwardableApi(ApiKeys.DELETE_TOPICS, requestBuilder)
-  }
-
-  @Test
-  def testAlterScramWithForwarding(): Unit = {
-    val requestBuilder = new AlterUserScramCredentialsRequest.Builder(new 
AlterUserScramCredentialsRequestData())
-    testForwardableApi(ApiKeys.ALTER_USER_SCRAM_CREDENTIALS, requestBuilder)
-  }
-
   @Test
   def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = {
     testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP)
@@ -10511,13 +10367,6 @@ class KafkaApisTest extends Logging {
     
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
   }
 
-  @Test
-  def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest)
-  }
-
   @Test
   def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)

Reply via email to