This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f5d9f3cd42 KAFKA-18399 Remove ZooKeeper from KafkaApis (7/N): 
CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS (#18433)
9f5d9f3cd42 is described below

commit 9f5d9f3cd42370e7653882425369c2d4a71ab2be
Author: TaiJuWu <tjwu1...@gmail.com>
AuthorDate: Mon Jan 13 21:23:59 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (7/N): CREATE_TOPICS, 
DELETE_TOPICS, CREATE_PARTITIONS (#18433)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 243 -----------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 287 +--------------------
 2 files changed, 1 insertion(+), 529 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9d257c39d59..085df39c886 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -36,11 +36,7 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
 import org.apache.kafka.common.internals.{FatalExitError, Topic}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
 AddPartitionsToTxnResultCollection}
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
-import 
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import 
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, 
CreatableTopicResultCollection}
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
 DeleteRecordsTopicResult}
-import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultCollection}
 import 
org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, 
ReplicaElectionResult}
 import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
@@ -1466,245 +1462,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
   }
 
-  def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
-
-    def sendResponseCallback(results: CreatableTopicResultCollection): Unit = {
-      val responseData = new CreateTopicsResponseData()
-        .setTopics(results)
-      val response = new CreateTopicsResponse(responseData)
-      trace(s"Sending create topics response $responseData for correlation id 
" +
-        s"${request.header.correlationId} to client 
${request.header.clientId}.")
-      
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
 request, response)
-    }
-
-    val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!zkSupport.controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authHelper.authorize(request.context, 
CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-
-      val allowedTopicNames = {
-        val topicNames = createTopicsRequest
-          .data
-          .topics
-          .asScala
-          .map(_.name)
-          .toSet
-
-          topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
-      }
-
-      val authorizedTopics = if (hasClusterAuthorization) {
-        allowedTopicNames
-      } else {
-        authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
allowedTopicNames)(identity)
-      }
-      val authorizedForDescribeConfigs = authHelper.filterByAuthorized(
-        request.context,
-        DESCRIBE_CONFIGS,
-        TOPIC,
-        allowedTopicNames,
-        logIfDenied = false
-      )(identity).map(name => name -> results.find(name)).toMap
-
-      results.forEach { topic =>
-        if (topic.name() == Topic.CLUSTER_METADATA_TOPIC_NAME) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage(s"Creation of internal topic 
${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited.")
-        } else if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name) && topic.name() 
!= Topic.CLUSTER_METADATA_TOPIC_NAME) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-        }
-      }
-      val toCreate = mutable.Map[String, CreatableTopic]()
-      createTopicsRequest.data.topics.forEach { topic =>
-        if (results.find(topic.name).errorCode == Errors.NONE.code) {
-          toCreate += topic.name -> topic
-        }
-      }
-      def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
-        errors.foreach { case (topicName, error) =>
-          val result = results.find(topicName)
-          result.setErrorCode(error.error.code)
-            .setErrorMessage(error.message)
-          // Reset any configs in the response if Create failed
-          if (error != ApiError.NONE) {
-            result.setConfigs(List.empty.asJava)
-              .setNumPartitions(-1)
-              .setReplicationFactor(-1)
-              .setTopicConfigErrorCode(Errors.NONE.code)
-          }
-        }
-        sendResponseCallback(results)
-      }
-      zkSupport.adminManager.createTopics(
-        createTopicsRequest.data.timeoutMs,
-        createTopicsRequest.data.validateOnly,
-        toCreate,
-        authorizedForDescribeConfigs,
-        controllerMutationQuota,
-        handleCreateTopicsResults)
-    }
-  }
-
-  def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val createPartitionsRequest = request.body[CreatePartitionsRequest]
-    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)
-
-    def sendResponseCallback(results: Map[String, ApiError]): Unit = {
-      val createPartitionsResults = results.map {
-        case (topic, error) => new CreatePartitionsTopicResult()
-          .setName(topic)
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-      }.toSeq
-      val response = new CreatePartitionsResponse(new 
CreatePartitionsResponseData()
-        .setResults(createPartitionsResults.asJava))
-      trace(s"Sending create partitions response $response for correlation id 
${request.header.correlationId} to " +
-        s"client ${request.header.clientId}.")
-      
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
 request, response)
-    }
-
-    if (!zkSupport.controller.isActive) {
-      val result = createPartitionsRequest.data.topics.asScala.map { topic =>
-        (topic.name, new ApiError(Errors.NOT_CONTROLLER, null))
-      }.toMap
-      sendResponseCallback(result)
-    } else {
-      // Special handling to add duplicate topics to the response
-      val topics = createPartitionsRequest.data.topics.asScala.toSeq
-      val dupes = topics.groupBy(_.name)
-        .filter { _._2.size > 1 }
-        .keySet
-      val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
-      val (authorized, unauthorized) = 
authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC,
-        notDuped)(_.name)
-
-      val (queuedForDeletion, valid) = authorized.partition { topic =>
-        zkSupport.controller.isTopicQueuedForDeletion(topic.name)
-      }
-
-      val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, 
"Duplicate topic in request.")) ++
-        unauthorized.map(_.name -> new 
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is 
failed.")) ++
-        queuedForDeletion.map(_.name -> new 
ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
-
-      zkSupport.adminManager.createPartitions(
-        createPartitionsRequest.data.timeoutMs,
-        valid,
-        createPartitionsRequest.data.validateOnly,
-        controllerMutationQuota,
-        result => sendResponseCallback(result ++ errors))
-    }
-  }
-
-  def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)
-
-    def sendResponseCallback(results: DeletableTopicResultCollection): Unit = {
-      val responseData = new DeleteTopicsResponseData()
-        .setResponses(results)
-      val response = new DeleteTopicsResponse(responseData)
-      trace(s"Sending delete topics response $response for correlation id 
${request.header.correlationId} to client ${request.header.clientId}.")
-      
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
 request, response)
-    }
-
-    val deleteTopicRequest = request.body[DeleteTopicsRequest]
-    val results = new 
DeletableTopicResultCollection(deleteTopicRequest.numberOfTopics())
-    val toDelete = mutable.Set[String]()
-    if (!zkSupport.controller.isActive) {
-      deleteTopicRequest.topics().forEach { topic =>
-        results.add(new DeletableTopicResult()
-          .setName(topic.name())
-          .setTopicId(topic.topicId())
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else if (!config.deleteTopicEnable) {
-      val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-      deleteTopicRequest.topics().forEach { topic =>
-        results.add(new DeletableTopicResult()
-          .setName(topic.name())
-          .setTopicId(topic.topicId())
-          .setErrorCode(error.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      val topicIdsFromRequest = 
deleteTopicRequest.topicIds().asScala.filter(topicId => topicId != 
Uuid.ZERO_UUID).toSet
-      deleteTopicRequest.topics().forEach { topic =>
-        if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
-          throw new InvalidRequestException("Topic name and topic ID can not 
both be specified.")
-        val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
-        else 
zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
-        results.add(new DeletableTopicResult()
-          .setName(name)
-          .setTopicId(topic.topicId()))
-      }
-      val authorizedDescribeTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
-        results.asScala.filter(result => result.name() != null))(_.name)
-      val authorizedDeleteTopics = 
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
-        results.asScala.filter(result => result.name() != null))(_.name)
-      results.forEach { topic =>
-        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && 
topic.name() == null
-        if (unresolvedTopicId) {
-          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-        } else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics.contains(topic.name)) {
-
-          // Because the client does not have Describe permission, the name 
should
-          // not be returned in the response. Note, however, that we do not 
consider
-          // the topicId itself to be sensitive, so there is no reason to 
obscure
-          // this case with `UNKNOWN_TOPIC_ID`.
-          topic.setName(null)
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-        } else if (!authorizedDeleteTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-        } else if (!metadataCache.contains(topic.name)) {
-          topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-        } else {
-          toDelete += topic.name
-        }
-      }
-      // If no authorized topics return immediately
-      if (toDelete.isEmpty)
-        sendResponseCallback(results)
-      else {
-        def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
-          errors.foreach {
-            case (topicName, error) =>
-              results.find(topicName)
-                .setErrorCode(error.code)
-          }
-          sendResponseCallback(results)
-        }
-
-        zkSupport.adminManager.deleteTopics(
-          deleteTopicRequest.data.timeoutMs,
-          toDelete,
-          controllerMutationQuota,
-          handleDeleteTopicsResults
-        )
-      }
-    }
-  }
-
   def handleDeleteRecordsRequest(request: RequestChannel.Request): Unit = {
     val deleteRecordsRequest = request.body[DeleteRecordsRequest]
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index eb91e92d9fd..cdc77a8213e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.cluster.{Broker, Partition}
-import kafka.controller.{ControllerContext, KafkaController}
+import kafka.controller.KafkaController
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.UnifiedLog
 import kafka.network.RequestChannel
@@ -772,77 +772,6 @@ class KafkaApisTest extends Logging {
     }
   }
 
-  @Test
-  def testCreateTopicsWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
-    val authorizedTopic = "authorized-topic"
-    val unauthorizedTopic = "unauthorized-topic"
-
-    authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER,
-      Resource.CLUSTER_NAME, AuthorizationResult.DENIED, logIfDenied = false)
-
-    createCombinedTopicAuthorization(authorizer, AclOperation.CREATE,
-      authorizedTopic, unauthorizedTopic)
-
-    createCombinedTopicAuthorization(authorizer, AclOperation.DESCRIBE_CONFIGS,
-      authorizedTopic, unauthorizedTopic, logIfDenied = false)
-
-    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 
ApiKeys.CREATE_TOPICS.latestVersion, clientId, 0)
-
-    when(controller.isActive).thenReturn(true)
-
-    val topics = new CreateTopicsRequestData.CreatableTopicCollection(3)
-    val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
-      .setName(authorizedTopic)
-    topics.add(topicToCreate)
-
-    val topicToFilter = new CreateTopicsRequestData.CreatableTopic()
-      .setName(unauthorizedTopic)
-    topics.add(topicToFilter)
-
-    val topicToProhibited = new CreateTopicsRequestData.CreatableTopic()
-      .setName(Topic.CLUSTER_METADATA_TOPIC_NAME)
-    topics.add(topicToProhibited)
-
-    val timeout = 10
-    val createTopicsRequest = new CreateTopicsRequest.Builder(
-      new CreateTopicsRequestData()
-        .setTimeoutMs(timeout)
-        .setValidateOnly(false)
-        .setTopics(topics))
-      .build(requestHeader.apiVersion)
-    val request = buildRequest(createTopicsRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
-
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(clientControllerQuotaManager.newQuotaFor(
-      ArgumentMatchers.eq(request), 
ArgumentMatchers.eq(6))).thenReturn(UnboundedControllerMutationQuota)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleCreateTopicsRequest(request)
-
-    val capturedCallback: ArgumentCaptor[Map[String, ApiError] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[String, ApiError] => Unit])
-
-    verify(adminManager).createTopics(
-      ArgumentMatchers.eq(timeout),
-      ArgumentMatchers.eq(false),
-      ArgumentMatchers.eq(Map(authorizedTopic -> topicToCreate)),
-      any(),
-      ArgumentMatchers.eq(UnboundedControllerMutationQuota),
-      capturedCallback.capture())
-    capturedCallback.getValue.apply(Map(authorizedTopic -> ApiError.NONE))
-
-    val capturedResponse = verifyNoThrottling[CreateTopicsResponse](request)
-    verifyCreateTopicsResult(capturedResponse,
-      Map(authorizedTopic -> Errors.NONE,
-        unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED,
-        Topic.CLUSTER_METADATA_TOPIC_NAME -> Errors.INVALID_REQUEST),
-      Map(authorizedTopic -> Errors.NONE,
-        unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED,
-        Topic.CLUSTER_METADATA_TOPIC_NAME -> Errors.NONE))
-  }
-
   @ParameterizedTest
   @CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
   def testKRaftControllerThrottleTimeEnforced(
@@ -906,49 +835,6 @@ class KafkaApisTest extends Logging {
       unauthorizedTopic, AuthorizationResult.DENIED, logIfAllowed, logIfDenied)
   }
 
-  private def createCombinedTopicAuthorization(authorizer: Authorizer,
-                                               operation: AclOperation,
-                                               authorizedTopic: String,
-                                               unauthorizedTopic: String,
-                                               logIfAllowed: Boolean = true,
-                                               logIfDenied: Boolean = true): 
Unit = {
-    val expectedAuthorizedActions = Seq(
-      new Action(operation,
-        new ResourcePattern(ResourceType.TOPIC, authorizedTopic, 
PatternType.LITERAL),
-        1, logIfAllowed, logIfDenied),
-      new Action(operation,
-        new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, 
PatternType.LITERAL),
-        1, logIfAllowed, logIfDenied))
-
-    when(authorizer.authorize(
-      any[RequestContext], argThat((t: java.util.List[Action]) => t != null && 
t.containsAll(expectedAuthorizedActions.asJava))
-    )).thenAnswer { invocation =>
-      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
-      actions.asScala.map { action =>
-        if (action.resourcePattern().name().equals(authorizedTopic))
-          AuthorizationResult.ALLOWED
-        else
-          AuthorizationResult.DENIED
-      }.asJava
-    }
-  }
-
-  private def verifyCreateTopicsResult(response: CreateTopicsResponse,
-                                       expectedErrorCodes: Map[String, Errors],
-                                       expectedTopicConfigErrorCodes: 
Map[String, Errors]): Unit = {
-    val actualErrorCodes = response.data.topics().asScala.map { topicResponse 
=>
-      topicResponse.name() -> Errors.forCode(topicResponse.errorCode)
-    }.toMap
-
-    assertEquals(expectedErrorCodes, actualErrorCodes)
-
-    val actualTopicConfigErrorCodes = response.data.topics().asScala.map { 
topicResponse =>
-      topicResponse.name() -> 
Errors.forCode(topicResponse.topicConfigErrorCode())
-    }.toMap
-
-    assertEquals(expectedTopicConfigErrorCodes, actualTopicConfigErrorCodes)
-  }
-
   @Test
   def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = {
     testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP)
@@ -10038,156 +9924,6 @@ class KafkaApisTest extends Logging {
     assertEquals("Ongoing", transactionState.transactionState)
   }
 
-  @Test
-  def testDeleteTopicsByIdAuthorization(): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-    val controllerContext: ControllerContext = mock(classOf[ControllerContext])
-
-    when(clientControllerQuotaManager.newQuotaFor(
-      any[RequestChannel.Request],
-      anyShort
-    )).thenReturn(UnboundedControllerMutationQuota)
-    when(controller.isActive).thenReturn(true)
-    when(controller.controllerContext).thenReturn(controllerContext)
-
-    val topicResults = Map(
-      AclOperation.DESCRIBE -> Map(
-        "foo" -> AuthorizationResult.DENIED,
-        "bar" -> AuthorizationResult.ALLOWED
-      ),
-      AclOperation.DELETE -> Map(
-        "foo" -> AuthorizationResult.DENIED,
-        "bar" -> AuthorizationResult.DENIED
-      )
-    )
-    when(authorizer.authorize(any[RequestContext], 
isNotNull[util.List[Action]])).thenAnswer(invocation => {
-      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
-      actions.asScala.map { action =>
-        val topic = action.resourcePattern.name
-        val ops = action.operation()
-        topicResults(ops)(topic)
-      }.asJava
-    })
-
-    // Try to delete three topics:
-    // 1. One without describe permission
-    // 2. One without delete permission
-    // 3. One which is authorized, but doesn't exist
-    val topicIdsMap = Map(
-      Uuid.randomUuid() -> Some("foo"),
-      Uuid.randomUuid() -> Some("bar"),
-      Uuid.randomUuid() -> None
-    )
-
-    topicIdsMap.foreach { case (topicId, topicNameOpt) =>
-      when(controllerContext.topicName(topicId)).thenReturn(topicNameOpt)
-    }
-
-    val topicDatas = topicIdsMap.keys.map { topicId =>
-      new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
-    }.toList
-    val deleteRequest = new DeleteTopicsRequest.Builder(new 
DeleteTopicsRequestData()
-      .setTopics(topicDatas.asJava))
-      .build(ApiKeys.DELETE_TOPICS.latestVersion)
-
-    val request = buildRequest(deleteRequest)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleDeleteTopicsRequest(request)
-    verify(authorizer, times(2)).authorize(any(), any())
-
-    val deleteResponse = verifyNoThrottling[DeleteTopicsResponse](request)
-
-    topicIdsMap.foreach { case (topicId, nameOpt) =>
-      val response = deleteResponse.data.responses.asScala.find(_.topicId == 
topicId).get
-      nameOpt match {
-        case Some("foo") =>
-          assertNull(response.name)
-          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.errorCode))
-        case Some("bar") =>
-          assertEquals("bar", response.name)
-          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.errorCode))
-        case None =>
-          assertNull(response.name)
-          assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
-        case _ =>
-          fail("Unexpected topic id/name mapping")
-      }
-    }
-  }
-
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: 
Boolean): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
-    when(clientControllerQuotaManager.newQuotaFor(
-      any[RequestChannel.Request],
-      anyShort
-    )).thenReturn(UnboundedControllerMutationQuota)
-    when(controller.isActive).thenReturn(true)
-
-    // Try to delete three topics:
-    // 1. One without describe permission
-    // 2. One without delete permission
-    // 3. One which is authorized, but doesn't exist
-
-    val topicResults = Map(
-      AclOperation.DESCRIBE -> Map(
-        "foo" -> AuthorizationResult.DENIED,
-        "bar" -> AuthorizationResult.ALLOWED,
-        "baz" -> AuthorizationResult.ALLOWED
-      ),
-      AclOperation.DELETE -> Map(
-        "foo" -> AuthorizationResult.DENIED,
-        "bar" -> AuthorizationResult.DENIED,
-        "baz" -> AuthorizationResult.ALLOWED
-      )
-    )
-    when(authorizer.authorize(any[RequestContext], 
isNotNull[util.List[Action]])).thenAnswer(invocation => {
-      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
-      actions.asScala.map { action =>
-        val topic = action.resourcePattern.name
-        val ops = action.operation()
-        topicResults(ops)(topic)
-      }.asJava
-    })
-
-    val deleteRequest = if (usePrimitiveTopicNameArray) {
-      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
-        .setTopicNames(List("foo", "bar", "baz").asJava))
-        .build(5.toShort)
-    } else {
-      val topicDatas = List(
-        new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
-        new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
-        new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
-      )
-      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
-        .setTopics(topicDatas.asJava))
-        .build(ApiKeys.DELETE_TOPICS.latestVersion)
-    }
-
-    val request = buildRequest(deleteRequest)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleDeleteTopicsRequest(request)
-    verify(authorizer, times(2)).authorize(any(), any())
-
-    val deleteResponse = verifyNoThrottling[DeleteTopicsResponse](request)
-
-    def lookupErrorCode(topic: String): Option[Errors] = {
-      Option(deleteResponse.data.responses().find(topic))
-        .map(result => Errors.forCode(result.errorCode))
-    }
-
-    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), 
lookupErrorCode("foo"))
-    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), 
lookupErrorCode("bar"))
-    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), 
lookupErrorCode("baz"))
-  }
-
   private def createMockRequest(): RequestChannel.Request = {
     val request: RequestChannel.Request = mock(classOf[RequestChannel.Request])
     val requestHeader: RequestHeader = mock(classOf[RequestHeader])
@@ -10215,27 +9951,6 @@ class KafkaApisTest extends Logging {
     verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
   }
 
-  @Test
-  def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest)
-  }
-
-  @Test
-  def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest)
-  }
-
-  @Test
-  def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest)
-  }
-
   @Test
   def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)

Reply via email to