This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9f5d9f3cd42 KAFKA-18399 Remove ZooKeeper from KafkaApis (7/N): CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS (#18433) 9f5d9f3cd42 is described below commit 9f5d9f3cd42370e7653882425369c2d4a71ab2be Author: TaiJuWu <tjwu1...@gmail.com> AuthorDate: Mon Jan 13 21:23:59 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (7/N): CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS (#18433) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 243 ----------------- .../scala/unit/kafka/server/KafkaApisTest.scala | 287 +-------------------- 2 files changed, 1 insertion(+), 529 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9d257c39d59..085df39c886 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -36,11 +36,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection} import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse -import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic -import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} -import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} import org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, ReplicaElectionResult} import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition @@ -1466,245 +1462,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, createResponseCallback) } - def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) - - def sendResponseCallback(results: CreatableTopicResultCollection): Unit = { - val responseData = new CreateTopicsResponseData() - .setTopics(results) - val response = new CreateTopicsResponse(responseData) - trace(s"Sending create topics response $responseData for correlation id " + - s"${request.header.correlationId} to client ${request.header.clientId}.") - requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response) - } - - val createTopicsRequest = request.body[CreateTopicsRequest] - val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) - if (!zkSupport.controller.isActive) { - createTopicsRequest.data.topics.forEach { topic => - results.add(new CreatableTopicResult().setName(topic.name) - .setErrorCode(Errors.NOT_CONTROLLER.code)) - } - sendResponseCallback(results) - } else { - createTopicsRequest.data.topics.forEach { topic => - results.add(new CreatableTopicResult().setName(topic.name)) - } - val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, - logIfDenied = false) - - val allowedTopicNames = { - val topicNames = createTopicsRequest - .data - .topics - .asScala - .map(_.name) - .toSet - - topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME)) - } - - val authorizedTopics = if (hasClusterAuthorization) { - allowedTopicNames - } else { - authHelper.filterByAuthorized(request.context, CREATE, TOPIC, allowedTopicNames)(identity) - } - val authorizedForDescribeConfigs = authHelper.filterByAuthorized( - request.context, - DESCRIBE_CONFIGS, - TOPIC, - allowedTopicNames, - logIfDenied = false - )(identity).map(name => name -> results.find(name)).toMap - - results.forEach { topic => - if (topic.name() == Topic.CLUSTER_METADATA_TOPIC_NAME) { - topic.setErrorCode(Errors.INVALID_REQUEST.code) - topic.setErrorMessage(s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited.") - } else if (results.findAll(topic.name).size > 1) { - topic.setErrorCode(Errors.INVALID_REQUEST.code) - topic.setErrorMessage("Found multiple entries for this topic.") - } else if (!authorizedTopics.contains(topic.name)) { - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - topic.setErrorMessage("Authorization failed.") - } - if (!authorizedForDescribeConfigs.contains(topic.name) && topic.name() != Topic.CLUSTER_METADATA_TOPIC_NAME) { - topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - } - } - val toCreate = mutable.Map[String, CreatableTopic]() - createTopicsRequest.data.topics.forEach { topic => - if (results.find(topic.name).errorCode == Errors.NONE.code) { - toCreate += topic.name -> topic - } - } - def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = { - errors.foreach { case (topicName, error) => - val result = results.find(topicName) - result.setErrorCode(error.error.code) - .setErrorMessage(error.message) - // Reset any configs in the response if Create failed - if (error != ApiError.NONE) { - result.setConfigs(List.empty.asJava) - .setNumPartitions(-1) - .setReplicationFactor(-1) - .setTopicConfigErrorCode(Errors.NONE.code) - } - } - sendResponseCallback(results) - } - zkSupport.adminManager.createTopics( - createTopicsRequest.data.timeoutMs, - createTopicsRequest.data.validateOnly, - toCreate, - authorizedForDescribeConfigs, - controllerMutationQuota, - handleCreateTopicsResults) - } - } - - def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val createPartitionsRequest = request.body[CreatePartitionsRequest] - val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3) - - def sendResponseCallback(results: Map[String, ApiError]): Unit = { - val createPartitionsResults = results.map { - case (topic, error) => new CreatePartitionsTopicResult() - .setName(topic) - .setErrorCode(error.error.code) - .setErrorMessage(error.message) - }.toSeq - val response = new CreatePartitionsResponse(new CreatePartitionsResponseData() - .setResults(createPartitionsResults.asJava)) - trace(s"Sending create partitions response $response for correlation id ${request.header.correlationId} to " + - s"client ${request.header.clientId}.") - requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response) - } - - if (!zkSupport.controller.isActive) { - val result = createPartitionsRequest.data.topics.asScala.map { topic => - (topic.name, new ApiError(Errors.NOT_CONTROLLER, null)) - }.toMap - sendResponseCallback(result) - } else { - // Special handling to add duplicate topics to the response - val topics = createPartitionsRequest.data.topics.asScala.toSeq - val dupes = topics.groupBy(_.name) - .filter { _._2.size > 1 } - .keySet - val notDuped = topics.filterNot(topic => dupes.contains(topic.name)) - val (authorized, unauthorized) = authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC, - notDuped)(_.name) - - val (queuedForDeletion, valid) = authorized.partition { topic => - zkSupport.controller.isTopicQueuedForDeletion(topic.name) - } - - val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++ - unauthorized.map(_.name -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++ - queuedForDeletion.map(_.name -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion.")) - - zkSupport.adminManager.createPartitions( - createPartitionsRequest.data.timeoutMs, - valid, - createPartitionsRequest.data.validateOnly, - controllerMutationQuota, - result => sendResponseCallback(result ++ errors)) - } - } - - def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5) - - def sendResponseCallback(results: DeletableTopicResultCollection): Unit = { - val responseData = new DeleteTopicsResponseData() - .setResponses(results) - val response = new DeleteTopicsResponse(responseData) - trace(s"Sending delete topics response $response for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") - requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response) - } - - val deleteTopicRequest = request.body[DeleteTopicsRequest] - val results = new DeletableTopicResultCollection(deleteTopicRequest.numberOfTopics()) - val toDelete = mutable.Set[String]() - if (!zkSupport.controller.isActive) { - deleteTopicRequest.topics().forEach { topic => - results.add(new DeletableTopicResult() - .setName(topic.name()) - .setTopicId(topic.topicId()) - .setErrorCode(Errors.NOT_CONTROLLER.code)) - } - sendResponseCallback(results) - } else if (!config.deleteTopicEnable) { - val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.topics().forEach { topic => - results.add(new DeletableTopicResult() - .setName(topic.name()) - .setTopicId(topic.topicId()) - .setErrorCode(error.code)) - } - sendResponseCallback(results) - } else { - val topicIdsFromRequest = deleteTopicRequest.topicIds().asScala.filter(topicId => topicId != Uuid.ZERO_UUID).toSet - deleteTopicRequest.topics().forEach { topic => - if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID) - throw new InvalidRequestException("Topic name and topic ID can not both be specified.") - val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name() - else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull - results.add(new DeletableTopicResult() - .setName(name) - .setTopicId(topic.topicId())) - } - val authorizedDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, - results.asScala.filter(result => result.name() != null))(_.name) - val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, - results.asScala.filter(result => result.name() != null))(_.name) - results.forEach { topic => - val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null - if (unresolvedTopicId) { - topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) { - - // Because the client does not have Describe permission, the name should - // not be returned in the response. Note, however, that we do not consider - // the topicId itself to be sensitive, so there is no reason to obscure - // this case with `UNKNOWN_TOPIC_ID`. - topic.setName(null) - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - } else if (!authorizedDeleteTopics.contains(topic.name)) { - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - } else if (!metadataCache.contains(topic.name)) { - topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - } else { - toDelete += topic.name - } - } - // If no authorized topics return immediately - if (toDelete.isEmpty) - sendResponseCallback(results) - else { - def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = { - errors.foreach { - case (topicName, error) => - results.find(topicName) - .setErrorCode(error.code) - } - sendResponseCallback(results) - } - - zkSupport.adminManager.deleteTopics( - deleteTopicRequest.data.timeoutMs, - toDelete, - controllerMutationQuota, - handleDeleteTopicsResults - ) - } - } - } - def handleDeleteRecordsRequest(request: RequestChannel.Request): Unit = { val deleteRecordsRequest = request.body[DeleteRecordsRequest] diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index eb91e92d9fd..cdc77a8213e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -18,7 +18,7 @@ package kafka.server import kafka.cluster.{Broker, Partition} -import kafka.controller.{ControllerContext, KafkaController} +import kafka.controller.KafkaController import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.UnifiedLog import kafka.network.RequestChannel @@ -772,77 +772,6 @@ class KafkaApisTest extends Logging { } } - @Test - def testCreateTopicsWithAuthorizer(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - - val authorizedTopic = "authorized-topic" - val unauthorizedTopic = "unauthorized-topic" - - authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER, - Resource.CLUSTER_NAME, AuthorizationResult.DENIED, logIfDenied = false) - - createCombinedTopicAuthorization(authorizer, AclOperation.CREATE, - authorizedTopic, unauthorizedTopic) - - createCombinedTopicAuthorization(authorizer, AclOperation.DESCRIBE_CONFIGS, - authorizedTopic, unauthorizedTopic, logIfDenied = false) - - val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion, clientId, 0) - - when(controller.isActive).thenReturn(true) - - val topics = new CreateTopicsRequestData.CreatableTopicCollection(3) - val topicToCreate = new CreateTopicsRequestData.CreatableTopic() - .setName(authorizedTopic) - topics.add(topicToCreate) - - val topicToFilter = new CreateTopicsRequestData.CreatableTopic() - .setName(unauthorizedTopic) - topics.add(topicToFilter) - - val topicToProhibited = new CreateTopicsRequestData.CreatableTopic() - .setName(Topic.CLUSTER_METADATA_TOPIC_NAME) - topics.add(topicToProhibited) - - val timeout = 10 - val createTopicsRequest = new CreateTopicsRequest.Builder( - new CreateTopicsRequestData() - .setTimeoutMs(timeout) - .setValidateOnly(false) - .setTopics(topics)) - .build(requestHeader.apiVersion) - val request = buildRequest(createTopicsRequest, - fromPrivilegedListener = true, requestHeader = Option(requestHeader)) - - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - when(clientControllerQuotaManager.newQuotaFor( - ArgumentMatchers.eq(request), ArgumentMatchers.eq(6))).thenReturn(UnboundedControllerMutationQuota) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleCreateTopicsRequest(request) - - val capturedCallback: ArgumentCaptor[Map[String, ApiError] => Unit] = ArgumentCaptor.forClass(classOf[Map[String, ApiError] => Unit]) - - verify(adminManager).createTopics( - ArgumentMatchers.eq(timeout), - ArgumentMatchers.eq(false), - ArgumentMatchers.eq(Map(authorizedTopic -> topicToCreate)), - any(), - ArgumentMatchers.eq(UnboundedControllerMutationQuota), - capturedCallback.capture()) - capturedCallback.getValue.apply(Map(authorizedTopic -> ApiError.NONE)) - - val capturedResponse = verifyNoThrottling[CreateTopicsResponse](request) - verifyCreateTopicsResult(capturedResponse, - Map(authorizedTopic -> Errors.NONE, - unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED, - Topic.CLUSTER_METADATA_TOPIC_NAME -> Errors.INVALID_REQUEST), - Map(authorizedTopic -> Errors.NONE, - unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED, - Topic.CLUSTER_METADATA_TOPIC_NAME -> Errors.NONE)) - } - @ParameterizedTest @CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) def testKRaftControllerThrottleTimeEnforced( @@ -906,49 +835,6 @@ class KafkaApisTest extends Logging { unauthorizedTopic, AuthorizationResult.DENIED, logIfAllowed, logIfDenied) } - private def createCombinedTopicAuthorization(authorizer: Authorizer, - operation: AclOperation, - authorizedTopic: String, - unauthorizedTopic: String, - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true): Unit = { - val expectedAuthorizedActions = Seq( - new Action(operation, - new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), - 1, logIfAllowed, logIfDenied), - new Action(operation, - new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), - 1, logIfAllowed, logIfDenied)) - - when(authorizer.authorize( - any[RequestContext], argThat((t: java.util.List[Action]) => t != null && t.containsAll(expectedAuthorizedActions.asJava)) - )).thenAnswer { invocation => - val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]] - actions.asScala.map { action => - if (action.resourcePattern().name().equals(authorizedTopic)) - AuthorizationResult.ALLOWED - else - AuthorizationResult.DENIED - }.asJava - } - } - - private def verifyCreateTopicsResult(response: CreateTopicsResponse, - expectedErrorCodes: Map[String, Errors], - expectedTopicConfigErrorCodes: Map[String, Errors]): Unit = { - val actualErrorCodes = response.data.topics().asScala.map { topicResponse => - topicResponse.name() -> Errors.forCode(topicResponse.errorCode) - }.toMap - - assertEquals(expectedErrorCodes, actualErrorCodes) - - val actualTopicConfigErrorCodes = response.data.topics().asScala.map { topicResponse => - topicResponse.name() -> Errors.forCode(topicResponse.topicConfigErrorCode()) - }.toMap - - assertEquals(expectedTopicConfigErrorCodes, actualTopicConfigErrorCodes) - } - @Test def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = { testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP) @@ -10038,156 +9924,6 @@ class KafkaApisTest extends Logging { assertEquals("Ongoing", transactionState.transactionState) } - @Test - def testDeleteTopicsByIdAuthorization(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - val controllerContext: ControllerContext = mock(classOf[ControllerContext]) - - when(clientControllerQuotaManager.newQuotaFor( - any[RequestChannel.Request], - anyShort - )).thenReturn(UnboundedControllerMutationQuota) - when(controller.isActive).thenReturn(true) - when(controller.controllerContext).thenReturn(controllerContext) - - val topicResults = Map( - AclOperation.DESCRIBE -> Map( - "foo" -> AuthorizationResult.DENIED, - "bar" -> AuthorizationResult.ALLOWED - ), - AclOperation.DELETE -> Map( - "foo" -> AuthorizationResult.DENIED, - "bar" -> AuthorizationResult.DENIED - ) - ) - when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => { - val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]] - actions.asScala.map { action => - val topic = action.resourcePattern.name - val ops = action.operation() - topicResults(ops)(topic) - }.asJava - }) - - // Try to delete three topics: - // 1. One without describe permission - // 2. One without delete permission - // 3. One which is authorized, but doesn't exist - val topicIdsMap = Map( - Uuid.randomUuid() -> Some("foo"), - Uuid.randomUuid() -> Some("bar"), - Uuid.randomUuid() -> None - ) - - topicIdsMap.foreach { case (topicId, topicNameOpt) => - when(controllerContext.topicName(topicId)).thenReturn(topicNameOpt) - } - - val topicDatas = topicIdsMap.keys.map { topicId => - new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId) - }.toList - val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() - .setTopics(topicDatas.asJava)) - .build(ApiKeys.DELETE_TOPICS.latestVersion) - - val request = buildRequest(deleteRequest) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleDeleteTopicsRequest(request) - verify(authorizer, times(2)).authorize(any(), any()) - - val deleteResponse = verifyNoThrottling[DeleteTopicsResponse](request) - - topicIdsMap.foreach { case (topicId, nameOpt) => - val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get - nameOpt match { - case Some("foo") => - assertNull(response.name) - assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) - case Some("bar") => - assertEquals("bar", response.name) - assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) - case None => - assertNull(response.name) - assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode)) - case _ => - fail("Unexpected topic id/name mapping") - } - } - } - - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - - when(clientControllerQuotaManager.newQuotaFor( - any[RequestChannel.Request], - anyShort - )).thenReturn(UnboundedControllerMutationQuota) - when(controller.isActive).thenReturn(true) - - // Try to delete three topics: - // 1. One without describe permission - // 2. One without delete permission - // 3. One which is authorized, but doesn't exist - - val topicResults = Map( - AclOperation.DESCRIBE -> Map( - "foo" -> AuthorizationResult.DENIED, - "bar" -> AuthorizationResult.ALLOWED, - "baz" -> AuthorizationResult.ALLOWED - ), - AclOperation.DELETE -> Map( - "foo" -> AuthorizationResult.DENIED, - "bar" -> AuthorizationResult.DENIED, - "baz" -> AuthorizationResult.ALLOWED - ) - ) - when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => { - val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]] - actions.asScala.map { action => - val topic = action.resourcePattern.name - val ops = action.operation() - topicResults(ops)(topic) - }.asJava - }) - - val deleteRequest = if (usePrimitiveTopicNameArray) { - new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() - .setTopicNames(List("foo", "bar", "baz").asJava)) - .build(5.toShort) - } else { - val topicDatas = List( - new DeleteTopicsRequestData.DeleteTopicState().setName("foo"), - new DeleteTopicsRequestData.DeleteTopicState().setName("bar"), - new DeleteTopicsRequestData.DeleteTopicState().setName("baz") - ) - new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() - .setTopics(topicDatas.asJava)) - .build(ApiKeys.DELETE_TOPICS.latestVersion) - } - - val request = buildRequest(deleteRequest) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleDeleteTopicsRequest(request) - verify(authorizer, times(2)).authorize(any(), any()) - - val deleteResponse = verifyNoThrottling[DeleteTopicsResponse](request) - - def lookupErrorCode(topic: String): Option[Errors] = { - Option(deleteResponse.data.responses().find(topic)) - .map(result => Errors.forCode(result.errorCode)) - } - - assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo")) - assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar")) - assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz")) - } - private def createMockRequest(): RequestChannel.Request = { val request: RequestChannel.Request = mock(classOf[RequestChannel.Request]) val requestHeader: RequestHeader = mock(classOf[RequestHeader]) @@ -10215,27 +9951,6 @@ class KafkaApisTest extends Logging { verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest) } - @Test - def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest) - } - - @Test - def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest) - } - - @Test - def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest) - } - @Test def testRaftShouldAlwaysForwardCreateAcls(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)