This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3fcaec7f4e3 KAFKA-18399 Remove ZooKeeper from KafkaApis (10/N): 
ALTER_CONFIG and INCREMENETAL_ALTER_CONFIG (#18432)
3fcaec7f4e3 is described below

commit 3fcaec7f4e34138af8e66dee094b7f5563ef082f
Author: TaiJuWu <tjwu1...@gmail.com>
AuthorDate: Wed Jan 15 18:25:45 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (10/N): ALTER_CONFIG and 
INCREMENETAL_ALTER_CONFIG (#18432)
    
    Reviewers: Christo Lolov <lol...@amazon.com>, Ismael Juma 
<ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  98 +----------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 184 +++++----------------
 2 files changed, 41 insertions(+), 241 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 318ca8f4263..32362913b46 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -26,16 +26,13 @@ import kafka.server.share.SharePartitionManager
 import kafka.utils.Logging
 import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, 
EndpointType}
+import org.apache.kafka.clients.admin.EndpointType
 import org.apache.kafka.common.acl.AclOperation
 import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.internals.{FatalExitError, Topic}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
 AddPartitionsToTxnResultCollection}
-import 
org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
 DeleteRecordsTopicResult}
 import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
@@ -2120,55 +2117,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
     if (remaining.resources().isEmpty) {
       sendResponse(Some(new AlterConfigsResponseData()))
-    } else if ((!request.isForwarded) && metadataSupport.canForward()) {
+    } else {
       metadataSupport.forwardingManager.get.forwardRequest(request,
         new AlterConfigsRequest(remaining, request.header.apiVersion()),
         response => sendResponse(response.map(_.data())))
-    } else {
-      sendResponse(Some(processLegacyAlterConfigsRequest(request, remaining)))
-    }
-  }
-
-  def processLegacyAlterConfigsRequest(
-    originalRequest: RequestChannel.Request,
-    data: AlterConfigsRequestData
-  ): AlterConfigsResponseData = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
-    val alterConfigsRequest = new AlterConfigsRequest(data, 
originalRequest.header.apiVersion())
-    val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
-      resource.`type` match {
-        case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and 
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
-        case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS =>
-          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, 
CLUSTER, CLUSTER_NAME)
-        case ConfigResource.Type.TOPIC =>
-          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, 
resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
-      }
-    }
-    val authorizedResult = 
zkSupport.adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
-    }
-    val response = new AlterConfigsResponseData()
-    (authorizedResult ++ unauthorizedResult).foreach { case (resource, error) 
=>
-      response.responses().add(new AlterConfigsResourceResponse()
-        .setErrorCode(error.error.code)
-        .setErrorMessage(error.message)
-        .setResourceName(resource.name)
-        .setResourceType(resource.`type`.id))
-    }
-    response
-  }
-
-  private def configsAuthorizationApiError(resource: ConfigResource): ApiError 
= {
-    val error = resource.`type` match {
-      case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => 
Errors.CLUSTER_AUTHORIZATION_FAILED
-      case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
-      case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED
-      case rt => throw new InvalidRequestException(s"Unexpected resource type 
$rt for resource ${resource.name}")
     }
-    new ApiError(error, null)
   }
 
   def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): 
Unit = {
@@ -2177,15 +2130,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, 
rType, rName))
     val remaining = 
ConfigAdminManager.copyWithoutPreprocessed(original.data(), 
preprocessingResponses)
 
-    // Before deciding whether to forward or handle locally, a ZK broker needs 
to check if
-    // the active controller is ZK or KRaft. If the controller is KRaft, we 
need to forward.
-    // If the controller is ZK, we need to process the request locally.
-    val isKRaftController = metadataSupport match {
-      case ZkSupport(_, _, _, _, metadataCache, _) =>
-        
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])
-      case RaftSupport(_, _) => true
-    }
-
     def sendResponse(secondPart: Option[ApiMessage]): Unit = {
       secondPart match {
         case Some(result: IncrementalAlterConfigsResponseData) =>
@@ -2198,49 +2142,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    // Forwarding has not happened yet, so handle both ZK and KRaft cases here
     if (remaining.resources().isEmpty) {
       sendResponse(Some(new IncrementalAlterConfigsResponseData()))
-    } else if ((!request.isForwarded) && metadataSupport.canForward() && 
isKRaftController) {
+    } else {
       metadataSupport.forwardingManager.get.forwardRequest(request,
         new IncrementalAlterConfigsRequest(remaining, 
request.header.apiVersion()),
         response => sendResponse(response.map(_.data())))
-    } else {
-      sendResponse(Some(processIncrementalAlterConfigsRequest(request, 
remaining)))
-    }
-  }
-
-  def processIncrementalAlterConfigsRequest(
-    originalRequest: RequestChannel.Request,
-    data: IncrementalAlterConfigsRequestData
-  ): IncrementalAlterConfigsResponseData = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
-    val configs = data.resources.iterator.asScala.map { alterConfigResource =>
-      val configResource = new 
ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType),
-        alterConfigResource.resourceName)
-      configResource -> alterConfigResource.configs.iterator.asScala.map {
-        alterConfig => new AlterConfigOp(new ConfigEntry(alterConfig.name, 
alterConfig.value),
-          OpType.forId(alterConfig.configOperation))
-      }.toBuffer
-    }.toMap
-
-    val (authorizedResources, unauthorizedResources) = configs.partition { 
case (resource, _) =>
-      resource.`type` match {
-        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | 
ConfigResource.Type.CLIENT_METRICS =>
-          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, 
CLUSTER, CLUSTER_NAME)
-        case ConfigResource.Type.TOPIC =>
-          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, 
resource.name)
-        case ConfigResource.Type.GROUP =>
-          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, GROUP, 
resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
-      }
-    }
-
-    val authorizedResult = 
zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, 
data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
     }
-    new IncrementalAlterConfigsResponse(0, (authorizedResult ++ 
unauthorizedResult).asJava).data()
   }
 
   def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 34b1d7cd274..d4cad343c04 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -320,44 +320,6 @@ class KafkaApisTest extends Logging {
     assertEquals(propValue, describeConfigsResponseData.value)
   }
 
-  @Test
-  def testAlterConfigsWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
-    val authorizedTopic = "authorized-topic"
-    val unauthorizedTopic = "unauthorized-topic"
-    val (authorizedResource, unauthorizedResource) =
-      createConfigsWithAuthorization(authorizer, authorizedTopic, 
unauthorizedTopic)
-
-    val configs = Map(
-      authorizedResource -> new AlterConfigsRequest.Config(
-        Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava),
-      unauthorizedResource -> new AlterConfigsRequest.Config(
-        Seq(new AlterConfigsRequest.ConfigEntry("foo-1", "bar-1")).asJava)
-    )
-
-    val topicHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,
-      clientId, 0)
-
-    val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
-      .build(topicHeader.apiVersion)
-    val request = buildRequest(alterConfigsRequest)
-
-    when(controller.isActive).thenReturn(false)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
-      .thenReturn(Map(authorizedResource -> ApiError.NONE))
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleAlterConfigsRequest(request)
-
-    val response = verifyNoThrottling[AlterConfigsResponse](request)
-    verifyAlterConfigResult(response, Map(authorizedTopic -> Errors.NONE,
-        unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED))
-    verify(authorizer, times(2)).authorize(any(), any())
-    verify(adminManager).alterConfigs(any(), anyBoolean())
-  }
-
   @Test
   def testElectLeadersForwarding(): Unit = {
     val requestBuilder = new 
ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
@@ -379,20 +341,15 @@ class KafkaApisTest extends Logging {
 
     val incrementalAlterConfigsRequest = 
getIncrementalAlterConfigRequestBuilder(
       Seq(resource), "consumer.session.timeout.ms", 
"45000").build(requestHeader.apiVersion)
-    val request = buildRequest(incrementalAlterConfigsRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+    val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = 
Option(requestHeader))
 
-    when(controller.isActive).thenReturn(true)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
-      .thenReturn(Map(resource -> ApiError.NONE))
-
-    createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
-    val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
-    verifyIncrementalAlterConfigResult(response, Map(consumerGroupId -> 
Errors.NONE))
-    verify(authorizer, times(1)).authorize(any(), any())
-    verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
+    createKafkaApis(authorizer = Some(authorizer), raftSupport = 
true).handleIncrementalAlterConfigsRequest(request)
+    verify(forwardingManager, times(1)).forwardRequest(
+      any(),
+      any(),
+      any()
+    )
   }
 
   @Test
@@ -453,10 +410,6 @@ class KafkaApisTest extends Logging {
     val subscriptionName = "client_metric_subscription_1"
     val authorizedResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)
 
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
-      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
-
     val props = ClientMetricsTestUtils.defaultProperties
     val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]()
     props.forEach((x, y) =>
@@ -465,51 +418,39 @@ class KafkaApisTest extends Logging {
     val configs = Map(authorizedResource -> new 
AlterConfigsRequest.Config(configEntries))
 
     val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0)
-    val request = buildRequest(
-      new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion))
+    val apiRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
+    val request = buildRequest(apiRequest)
 
-    when(controller.isActive).thenReturn(false)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
-      .thenReturn(Map(authorizedResource -> ApiError.NONE))
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
+    kafkaApis = createKafkaApis(raftSupport = true)
     kafkaApis.handleAlterConfigsRequest(request)
-    val response = verifyNoThrottling[AlterConfigsResponse](request)
-    verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE))
-    verify(authorizer, times(1)).authorize(any(), any())
-    verify(adminManager).alterConfigs(any(), anyBoolean())
+    verify(forwardingManager, times(1)).forwardRequest(
+      any(),
+      any(),
+      any()
+    )
   }
 
   @Test
   def testIncrementalClientMetricAlterConfigs(): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
     val subscriptionName = "client_metric_subscription_1"
     val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
subscriptionName)
 
-    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
-      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
-
     val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS,
       ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0)
 
     val incrementalAlterConfigsRequest = 
getIncrementalAlterConfigRequestBuilder(
       Seq(resource), "metrics", "foo.bar").build(requestHeader.apiVersion)
-    val request = buildRequest(incrementalAlterConfigsRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+    val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = 
Option(requestHeader))
 
-    when(controller.isActive).thenReturn(true)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
-      .thenReturn(Map(resource -> ApiError.NONE))
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
+    kafkaApis = createKafkaApis(raftSupport = true)
     kafkaApis.handleIncrementalAlterConfigsRequest(request)
-    val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
-    verifyIncrementalAlterConfigResult(response, Map(subscriptionName -> 
Errors.NONE ))
-    verify(authorizer, times(1)).authorize(any(), any())
-    verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
+    verify(forwardingManager, times(1)).forwardRequest(
+      any(),
+      any(),
+      any()
+    )
   }
 
   private def getIncrementalAlterConfigRequestBuilder(configResources: 
Seq[ConfigResource],
@@ -657,78 +598,41 @@ class KafkaApisTest extends Logging {
       .thenReturn(Seq(result).asJava)
   }
 
-  private def verifyAlterConfigResult(response: AlterConfigsResponse,
-                                      expectedResults: Map[String, Errors]): 
Unit = {
-    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
-      resourceResponse.resourceName -> 
Errors.forCode(resourceResponse.errorCode)
-    }.toMap
-
-    assertEquals(expectedResults, responseMap)
-  }
-
-  private def createConfigsWithAuthorization(authorizer: Authorizer,
-                                             authorizedTopic: String,
-                                             unauthorizedTopic: String): 
(ConfigResource, ConfigResource) = {
-    val authorizedResource = new ConfigResource(ConfigResource.Type.TOPIC, 
authorizedTopic)
-
-    val unauthorizedResource = new ConfigResource(ConfigResource.Type.TOPIC, 
unauthorizedTopic)
-
-    createTopicAuthorization(authorizer, AclOperation.ALTER_CONFIGS, 
authorizedTopic, unauthorizedTopic)
-    (authorizedResource, unauthorizedResource)
-  }
-
   @Test
   def testIncrementalAlterConfigsWithAuthorizer(): Unit = {
     val authorizer: Authorizer = mock(classOf[Authorizer])
 
-    val authorizedTopic = "authorized-topic"
-    val unauthorizedTopic = "unauthorized-topic"
-    val (authorizedResource, unauthorizedResource) =
-      createConfigsWithAuthorization(authorizer, authorizedTopic, 
unauthorizedTopic)
+    val localResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, 
"localResource")
+    val forwardedResource = new ConfigResource(ConfigResource.Type.GROUP, 
"forwardedResource")
 
     val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, 
ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0)
 
-    val incrementalAlterConfigsRequest = 
getIncrementalAlterConfigRequestBuilder(Seq(authorizedResource, 
unauthorizedResource))
+    val incrementalAlterConfigsRequest = 
getIncrementalAlterConfigRequestBuilder(Seq(localResource, forwardedResource))
       .build(requestHeader.apiVersion)
-    val request = buildRequest(incrementalAlterConfigsRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+    val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = 
Option(requestHeader))
 
-    when(controller.isActive).thenReturn(true)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
-      .thenReturn(Map(authorizedResource -> ApiError.NONE))
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer), raftSupport = 
true)
     kafkaApis.handleIncrementalAlterConfigsRequest(request)
 
-    val capturedResponse = 
verifyNoThrottling[IncrementalAlterConfigsResponse](request)
-    verifyIncrementalAlterConfigResult(capturedResponse, Map(
-      authorizedTopic -> Errors.NONE,
-      unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED
-    ))
-
-    verify(authorizer, times(2)).authorize(any(), any())
-    verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
+    verify(authorizer, times(1)).authorize(any(), any())
+    verify(forwardingManager, times(1)).forwardRequest(
+      any(),
+      any(),
+      any()
+    )
   }
 
   private def getIncrementalAlterConfigRequestBuilder(configResources: 
Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
     val resourceMap = configResources.map(configResource => {
       configResource -> Set(
         new AlterConfigOp(new ConfigEntry("foo", "bar"),
-        OpType.forId(configResource.`type`.id))).asJavaCollection
+        OpType.SET)).asJavaCollection
     }).toMap.asJava
 
     new IncrementalAlterConfigsRequest.Builder(resourceMap, false)
   }
 
-  private def verifyIncrementalAlterConfigResult(response: 
IncrementalAlterConfigsResponse,
-                                                 expectedResults: Map[String, 
Errors]): Unit = {
-    val responseMap = response.data.responses.asScala.map { resourceResponse =>
-      resourceResponse.resourceName -> 
Errors.forCode(resourceResponse.errorCode)
-    }.toMap
-    assertEquals(expectedResults, responseMap)
-  }
-
   @ParameterizedTest
   @CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
   def testKRaftControllerThrottleTimeEnforced(
@@ -780,18 +684,6 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedThrottleTimeMs, responseData.throttleTimeMs)
   }
 
-  private def createTopicAuthorization(authorizer: Authorizer,
-                                       operation: AclOperation,
-                                       authorizedTopic: String,
-                                       unauthorizedTopic: String,
-                                       logIfAllowed: Boolean = true,
-                                       logIfDenied: Boolean = true): Unit = {
-    authorizeResource(authorizer, operation, ResourceType.TOPIC,
-      authorizedTopic, AuthorizationResult.ALLOWED, logIfAllowed, logIfDenied)
-    authorizeResource(authorizer, operation, ResourceType.TOPIC,
-      unauthorizedTopic, AuthorizationResult.DENIED, logIfAllowed, logIfDenied)
-  }
-
   @Test
   def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = {
     testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP)

Reply via email to