This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3fcaec7f4e3 KAFKA-18399 Remove ZooKeeper from KafkaApis (10/N): ALTER_CONFIG and INCREMENETAL_ALTER_CONFIG (#18432) 3fcaec7f4e3 is described below commit 3fcaec7f4e34138af8e66dee094b7f5563ef082f Author: TaiJuWu <tjwu1...@gmail.com> AuthorDate: Wed Jan 15 18:25:45 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (10/N): ALTER_CONFIG and INCREMENETAL_ALTER_CONFIG (#18432) Reviewers: Christo Lolov <lol...@amazon.com>, Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 98 +---------- .../scala/unit/kafka/server/KafkaApisTest.scala | 184 +++++---------------- 2 files changed, 41 insertions(+), 241 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 318ca8f4263..32362913b46 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -26,16 +26,13 @@ import kafka.server.share.SharePartitionManager import kafka.utils.Logging import org.apache.kafka.admin.AdminUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.AlterConfigOp.OpType -import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, EndpointType} +import org.apache.kafka.clients.admin.EndpointType import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.acl.AclOperation._ -import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection} -import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition @@ -2120,55 +2117,11 @@ class KafkaApis(val requestChannel: RequestChannel, } if (remaining.resources().isEmpty) { sendResponse(Some(new AlterConfigsResponseData())) - } else if ((!request.isForwarded) && metadataSupport.canForward()) { + } else { metadataSupport.forwardingManager.get.forwardRequest(request, new AlterConfigsRequest(remaining, request.header.apiVersion()), response => sendResponse(response.map(_.data()))) - } else { - sendResponse(Some(processLegacyAlterConfigsRequest(request, remaining))) - } - } - - def processLegacyAlterConfigsRequest( - originalRequest: RequestChannel.Request, - data: AlterConfigsRequestData - ): AlterConfigsResponseData = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest)) - val alterConfigsRequest = new AlterConfigsRequest(data, originalRequest.header.apiVersion()) - val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) => - resource.`type` match { - case ConfigResource.Type.BROKER_LOGGER => - throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") - case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS => - authHelper.authorize(originalRequest.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) - case ConfigResource.Type.TOPIC => - authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, resource.name) - case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") - } - } - val authorizedResult = zkSupport.adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) - } - val response = new AlterConfigsResponseData() - (authorizedResult ++ unauthorizedResult).foreach { case (resource, error) => - response.responses().add(new AlterConfigsResourceResponse() - .setErrorCode(error.error.code) - .setErrorMessage(error.message) - .setResourceName(resource.name) - .setResourceType(resource.`type`.id)) - } - response - } - - private def configsAuthorizationApiError(resource: ConfigResource): ApiError = { - val error = resource.`type` match { - case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED - case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED - case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED - case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") } - new ApiError(error, null) } def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = { @@ -2177,15 +2130,6 @@ class KafkaApis(val requestChannel: RequestChannel, (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, rType, rName)) val remaining = ConfigAdminManager.copyWithoutPreprocessed(original.data(), preprocessingResponses) - // Before deciding whether to forward or handle locally, a ZK broker needs to check if - // the active controller is ZK or KRaft. If the controller is KRaft, we need to forward. - // If the controller is ZK, we need to process the request locally. - val isKRaftController = metadataSupport match { - case ZkSupport(_, _, _, _, metadataCache, _) => - metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId]) - case RaftSupport(_, _) => true - } - def sendResponse(secondPart: Option[ApiMessage]): Unit = { secondPart match { case Some(result: IncrementalAlterConfigsResponseData) => @@ -2198,49 +2142,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Forwarding has not happened yet, so handle both ZK and KRaft cases here if (remaining.resources().isEmpty) { sendResponse(Some(new IncrementalAlterConfigsResponseData())) - } else if ((!request.isForwarded) && metadataSupport.canForward() && isKRaftController) { + } else { metadataSupport.forwardingManager.get.forwardRequest(request, new IncrementalAlterConfigsRequest(remaining, request.header.apiVersion()), response => sendResponse(response.map(_.data()))) - } else { - sendResponse(Some(processIncrementalAlterConfigsRequest(request, remaining))) - } - } - - def processIncrementalAlterConfigsRequest( - originalRequest: RequestChannel.Request, - data: IncrementalAlterConfigsRequestData - ): IncrementalAlterConfigsResponseData = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest)) - val configs = data.resources.iterator.asScala.map { alterConfigResource => - val configResource = new ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType), - alterConfigResource.resourceName) - configResource -> alterConfigResource.configs.iterator.asScala.map { - alterConfig => new AlterConfigOp(new ConfigEntry(alterConfig.name, alterConfig.value), - OpType.forId(alterConfig.configOperation)) - }.toBuffer - }.toMap - - val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) => - resource.`type` match { - case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS => - authHelper.authorize(originalRequest.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) - case ConfigResource.Type.TOPIC => - authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, resource.name) - case ConfigResource.Type.GROUP => - authHelper.authorize(originalRequest.context, ALTER_CONFIGS, GROUP, resource.name) - case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") - } - } - - val authorizedResult = zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) } - new IncrementalAlterConfigsResponse(0, (authorizedResult ++ unauthorizedResult).asJava).data() } def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 34b1d7cd274..d4cad343c04 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -320,44 +320,6 @@ class KafkaApisTest extends Logging { assertEquals(propValue, describeConfigsResponseData.value) } - @Test - def testAlterConfigsWithAuthorizer(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - - val authorizedTopic = "authorized-topic" - val unauthorizedTopic = "unauthorized-topic" - val (authorizedResource, unauthorizedResource) = - createConfigsWithAuthorization(authorizer, authorizedTopic, unauthorizedTopic) - - val configs = Map( - authorizedResource -> new AlterConfigsRequest.Config( - Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava), - unauthorizedResource -> new AlterConfigsRequest.Config( - Seq(new AlterConfigsRequest.ConfigEntry("foo-1", "bar-1")).asJava) - ) - - val topicHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, - clientId, 0) - - val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false) - .build(topicHeader.apiVersion) - val request = buildRequest(alterConfigsRequest) - - when(controller.isActive).thenReturn(false) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false))) - .thenReturn(Map(authorizedResource -> ApiError.NONE)) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleAlterConfigsRequest(request) - - val response = verifyNoThrottling[AlterConfigsResponse](request) - verifyAlterConfigResult(response, Map(authorizedTopic -> Errors.NONE, - unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED)) - verify(authorizer, times(2)).authorize(any(), any()) - verify(adminManager).alterConfigs(any(), anyBoolean()) - } - @Test def testElectLeadersForwarding(): Unit = { val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000) @@ -379,20 +341,15 @@ class KafkaApisTest extends Logging { val incrementalAlterConfigsRequest = getIncrementalAlterConfigRequestBuilder( Seq(resource), "consumer.session.timeout.ms", "45000").build(requestHeader.apiVersion) - val request = buildRequest(incrementalAlterConfigsRequest, - fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader)) - when(controller.isActive).thenReturn(true) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false))) - .thenReturn(Map(resource -> ApiError.NONE)) - - createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request) - val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) - verifyIncrementalAlterConfigResult(response, Map(consumerGroupId -> Errors.NONE)) - verify(authorizer, times(1)).authorize(any(), any()) - verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) + createKafkaApis(authorizer = Some(authorizer), raftSupport = true).handleIncrementalAlterConfigsRequest(request) + verify(forwardingManager, times(1)).forwardRequest( + any(), + any(), + any() + ) } @Test @@ -453,10 +410,6 @@ class KafkaApisTest extends Logging { val subscriptionName = "client_metric_subscription_1" val authorizedResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName) - val authorizer: Authorizer = mock(classOf[Authorizer]) - authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, - Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) - val props = ClientMetricsTestUtils.defaultProperties val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]() props.forEach((x, y) => @@ -465,51 +418,39 @@ class KafkaApisTest extends Logging { val configs = Map(authorizedResource -> new AlterConfigsRequest.Config(configEntries)) val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0) - val request = buildRequest( - new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion)) + val apiRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion) + val request = buildRequest(apiRequest) - when(controller.isActive).thenReturn(false) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false))) - .thenReturn(Map(authorizedResource -> ApiError.NONE)) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) + kafkaApis = createKafkaApis(raftSupport = true) kafkaApis.handleAlterConfigsRequest(request) - val response = verifyNoThrottling[AlterConfigsResponse](request) - verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE)) - verify(authorizer, times(1)).authorize(any(), any()) - verify(adminManager).alterConfigs(any(), anyBoolean()) + verify(forwardingManager, times(1)).forwardRequest( + any(), + any(), + any() + ) } @Test def testIncrementalClientMetricAlterConfigs(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - val subscriptionName = "client_metric_subscription_1" val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName) - authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, - Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) - val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0) val incrementalAlterConfigsRequest = getIncrementalAlterConfigRequestBuilder( Seq(resource), "metrics", "foo.bar").build(requestHeader.apiVersion) - val request = buildRequest(incrementalAlterConfigsRequest, - fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader)) - when(controller.isActive).thenReturn(true) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false))) - .thenReturn(Map(resource -> ApiError.NONE)) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) + kafkaApis = createKafkaApis(raftSupport = true) kafkaApis.handleIncrementalAlterConfigsRequest(request) - val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) - verifyIncrementalAlterConfigResult(response, Map(subscriptionName -> Errors.NONE )) - verify(authorizer, times(1)).authorize(any(), any()) - verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) + verify(forwardingManager, times(1)).forwardRequest( + any(), + any(), + any() + ) } private def getIncrementalAlterConfigRequestBuilder(configResources: Seq[ConfigResource], @@ -657,78 +598,41 @@ class KafkaApisTest extends Logging { .thenReturn(Seq(result).asJava) } - private def verifyAlterConfigResult(response: AlterConfigsResponse, - expectedResults: Map[String, Errors]): Unit = { - val responseMap = response.data.responses().asScala.map { resourceResponse => - resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode) - }.toMap - - assertEquals(expectedResults, responseMap) - } - - private def createConfigsWithAuthorization(authorizer: Authorizer, - authorizedTopic: String, - unauthorizedTopic: String): (ConfigResource, ConfigResource) = { - val authorizedResource = new ConfigResource(ConfigResource.Type.TOPIC, authorizedTopic) - - val unauthorizedResource = new ConfigResource(ConfigResource.Type.TOPIC, unauthorizedTopic) - - createTopicAuthorization(authorizer, AclOperation.ALTER_CONFIGS, authorizedTopic, unauthorizedTopic) - (authorizedResource, unauthorizedResource) - } - @Test def testIncrementalAlterConfigsWithAuthorizer(): Unit = { val authorizer: Authorizer = mock(classOf[Authorizer]) - val authorizedTopic = "authorized-topic" - val unauthorizedTopic = "unauthorized-topic" - val (authorizedResource, unauthorizedResource) = - createConfigsWithAuthorization(authorizer, authorizedTopic, unauthorizedTopic) + val localResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, "localResource") + val forwardedResource = new ConfigResource(ConfigResource.Type.GROUP, "forwardedResource") val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0) - val incrementalAlterConfigsRequest = getIncrementalAlterConfigRequestBuilder(Seq(authorizedResource, unauthorizedResource)) + val incrementalAlterConfigsRequest = getIncrementalAlterConfigRequestBuilder(Seq(localResource, forwardedResource)) .build(requestHeader.apiVersion) - val request = buildRequest(incrementalAlterConfigsRequest, - fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader)) - when(controller.isActive).thenReturn(true) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false))) - .thenReturn(Map(authorizedResource -> ApiError.NONE)) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) + kafkaApis = createKafkaApis(authorizer = Some(authorizer), raftSupport = true) kafkaApis.handleIncrementalAlterConfigsRequest(request) - val capturedResponse = verifyNoThrottling[IncrementalAlterConfigsResponse](request) - verifyIncrementalAlterConfigResult(capturedResponse, Map( - authorizedTopic -> Errors.NONE, - unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED - )) - - verify(authorizer, times(2)).authorize(any(), any()) - verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) + verify(authorizer, times(1)).authorize(any(), any()) + verify(forwardingManager, times(1)).forwardRequest( + any(), + any(), + any() + ) } private def getIncrementalAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = { val resourceMap = configResources.map(configResource => { configResource -> Set( new AlterConfigOp(new ConfigEntry("foo", "bar"), - OpType.forId(configResource.`type`.id))).asJavaCollection + OpType.SET)).asJavaCollection }).toMap.asJava new IncrementalAlterConfigsRequest.Builder(resourceMap, false) } - private def verifyIncrementalAlterConfigResult(response: IncrementalAlterConfigsResponse, - expectedResults: Map[String, Errors]): Unit = { - val responseMap = response.data.responses.asScala.map { resourceResponse => - resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode) - }.toMap - assertEquals(expectedResults, responseMap) - } - @ParameterizedTest @CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) def testKRaftControllerThrottleTimeEnforced( @@ -780,18 +684,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedThrottleTimeMs, responseData.throttleTimeMs) } - private def createTopicAuthorization(authorizer: Authorizer, - operation: AclOperation, - authorizedTopic: String, - unauthorizedTopic: String, - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true): Unit = { - authorizeResource(authorizer, operation, ResourceType.TOPIC, - authorizedTopic, AuthorizationResult.ALLOWED, logIfAllowed, logIfDenied) - authorizeResource(authorizer, operation, ResourceType.TOPIC, - unauthorizedTopic, AuthorizationResult.DENIED, logIfAllowed, logIfDenied) - } - @Test def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = { testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP)