This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 4334de222c5 KAFKA-18399 Remove ZooKeeper from KafkaApis (5/N): ALTER_PARTITION_REASSIGNMENTS, LIST_PARTITION_REASSIGNMENTS (#18464) 4334de222c5 is described below commit 4334de222c5f591578a20da6a1b223ac0fe73346 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Mon Jan 13 01:33:24 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (5/N): ALTER_PARTITION_REASSIGNMENTS, LIST_PARTITION_REASSIGNMENTS (#18464) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 88 ---------------------- .../scala/unit/kafka/server/KafkaApisTest.scala | 14 ---- 2 files changed, 102 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35c872abe13..c084b637965 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.controller.ReplicaAssignment import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.network.RequestChannel import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} @@ -37,7 +36,6 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection} import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse -import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} @@ -2409,92 +2407,6 @@ class KafkaApis(val requestChannel: RequestChannel, response } - def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - authHelper.authorizeClusterOperation(request, ALTER) - val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest] - - def sendResponseCallback(result: Either[Map[TopicPartition, ApiError], ApiError]): Unit = { - val responseData = result match { - case Right(topLevelError) => - new AlterPartitionReassignmentsResponseData().setErrorMessage(topLevelError.message).setErrorCode(topLevelError.error.code) - - case Left(assignments) => - val topicResponses = assignments.groupBy(_._1.topic).map { - case (topic, reassignmentsByTp) => - val partitionResponses = reassignmentsByTp.map { - case (topicPartition, error) => - new ReassignablePartitionResponse().setPartitionIndex(topicPartition.partition) - .setErrorCode(error.error.code).setErrorMessage(error.message) - } - new ReassignableTopicResponse().setName(topic).setPartitions(partitionResponses.toList.asJava) - } - new AlterPartitionReassignmentsResponseData().setResponses(topicResponses.toList.asJava) - } - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs)) - ) - } - - val reassignments = alterPartitionReassignmentsRequest.data.topics.asScala.flatMap { - reassignableTopic => reassignableTopic.partitions.asScala.map { - reassignablePartition => - val tp = new TopicPartition(reassignableTopic.name, reassignablePartition.partitionIndex) - if (reassignablePartition.replicas == null) - tp -> None // revert call - else - tp -> Some(reassignablePartition.replicas.asScala.map(_.toInt)) - } - }.toMap - - zkSupport.controller.alterPartitionReassignments(reassignments, sendResponseCallback) - } - - def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - authHelper.authorizeClusterOperation(request, DESCRIBE) - val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest] - - def sendResponseCallback(result: Either[Map[TopicPartition, ReplicaAssignment], ApiError]): Unit = { - val responseData = result match { - case Right(error) => new ListPartitionReassignmentsResponseData().setErrorMessage(error.message).setErrorCode(error.error.code) - - case Left(assignments) => - val topicReassignments = assignments.groupBy(_._1.topic).map { - case (topic, reassignmentsByTp) => - val partitionReassignments = reassignmentsByTp.map { - case (topicPartition, assignment) => - new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment() - .setPartitionIndex(topicPartition.partition) - .setAddingReplicas(assignment.addingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]]) - .setRemovingReplicas(assignment.removingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]]) - .setReplicas(assignment.replicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]]) - }.toList - - new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic) - .setPartitions(partitionReassignments.asJava) - }.toList - - new ListPartitionReassignmentsResponseData().setTopics(topicReassignments.asJava) - } - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new ListPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs)) - ) - } - - val partitionsOpt = Option(listPartitionReassignmentsRequest.data.topics).map { topics => - topics.iterator().asScala.flatMap { topic => - topic.partitionIndexes.iterator().asScala.map { partitionIndex => - new TopicPartition(topic.name(), partitionIndex) - } - }.toSet - } - - zkSupport.controller.listPartitionReassignments(partitionsOpt, sendResponseCallback) - } - private def configsAuthorizationApiError(resource: ConfigResource): ApiError = { val error = resource.`type` match { case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 0abeb3192ed..d8532c6cd8b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10288,13 +10288,6 @@ class KafkaApisTest extends Logging { response.data()) } - @Test - def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest) - } - @Test def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = { val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)) @@ -10381,13 +10374,6 @@ class KafkaApisTest extends Logging { verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders) } - @Test - def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest) - } - @Test def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")