This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a8ffe7f64c KAFKA-18399 Remove ZooKeeper from KafkaApis (5/N): 
ALTER_PARTITION_REASSIGNMENTS, LIST_PARTITION_REASSIGNMENTS (#18464)
6a8ffe7f64c is described below

commit 6a8ffe7f64ca81543cc660f7c064303ab72b747e
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Mon Jan 13 01:33:24 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (5/N): 
ALTER_PARTITION_REASSIGNMENTS, LIST_PARTITION_REASSIGNMENTS (#18464)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 88 ----------------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 14 ----
 2 files changed, 102 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 35c872abe13..c084b637965 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.controller.ReplicaAssignment
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
@@ -37,7 +36,6 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
 import org.apache.kafka.common.internals.{FatalExitError, Topic}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
 AddPartitionsToTxnResultCollection}
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
-import 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse,
 ReassignableTopicResponse}
 import 
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, 
CreatableTopicResultCollection}
@@ -2409,92 +2407,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     response
   }
 
-  def handleAlterPartitionReassignmentsRequest(request: 
RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    authHelper.authorizeClusterOperation(request, ALTER)
-    val alterPartitionReassignmentsRequest = 
request.body[AlterPartitionReassignmentsRequest]
-
-    def sendResponseCallback(result: Either[Map[TopicPartition, ApiError], 
ApiError]): Unit = {
-      val responseData = result match {
-        case Right(topLevelError) =>
-          new 
AlterPartitionReassignmentsResponseData().setErrorMessage(topLevelError.message).setErrorCode(topLevelError.error.code)
-
-        case Left(assignments) =>
-          val topicResponses = assignments.groupBy(_._1.topic).map {
-            case (topic, reassignmentsByTp) =>
-              val partitionResponses = reassignmentsByTp.map {
-                case (topicPartition, error) =>
-                  new 
ReassignablePartitionResponse().setPartitionIndex(topicPartition.partition)
-                    
.setErrorCode(error.error.code).setErrorMessage(error.message)
-              }
-              new 
ReassignableTopicResponse().setName(topic).setPartitions(partitionResponses.toList.asJava)
-          }
-          new 
AlterPartitionReassignmentsResponseData().setResponses(topicResponses.toList.asJava)
-      }
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new 
AlterPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs))
-      )
-    }
-
-    val reassignments = 
alterPartitionReassignmentsRequest.data.topics.asScala.flatMap {
-      reassignableTopic => reassignableTopic.partitions.asScala.map {
-        reassignablePartition =>
-          val tp = new TopicPartition(reassignableTopic.name, 
reassignablePartition.partitionIndex)
-          if (reassignablePartition.replicas == null)
-            tp -> None // revert call
-          else
-            tp -> Some(reassignablePartition.replicas.asScala.map(_.toInt))
-      }
-    }.toMap
-
-    zkSupport.controller.alterPartitionReassignments(reassignments, 
sendResponseCallback)
-  }
-
-  def handleListPartitionReassignmentsRequest(request: 
RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    authHelper.authorizeClusterOperation(request, DESCRIBE)
-    val listPartitionReassignmentsRequest = 
request.body[ListPartitionReassignmentsRequest]
-
-    def sendResponseCallback(result: Either[Map[TopicPartition, 
ReplicaAssignment], ApiError]): Unit = {
-      val responseData = result match {
-        case Right(error) => new 
ListPartitionReassignmentsResponseData().setErrorMessage(error.message).setErrorCode(error.error.code)
-
-        case Left(assignments) =>
-          val topicReassignments = assignments.groupBy(_._1.topic).map {
-            case (topic, reassignmentsByTp) =>
-              val partitionReassignments = reassignmentsByTp.map {
-                case (topicPartition, assignment) =>
-                  new 
ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
-                    .setPartitionIndex(topicPartition.partition)
-                    
.setAddingReplicas(assignment.addingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
-                    
.setRemovingReplicas(assignment.removingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
-                    
.setReplicas(assignment.replicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
-              }.toList
-
-              new 
ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic)
-                .setPartitions(partitionReassignments.asJava)
-          }.toList
-
-          new 
ListPartitionReassignmentsResponseData().setTopics(topicReassignments.asJava)
-      }
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new 
ListPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs))
-      )
-    }
-
-    val partitionsOpt = 
Option(listPartitionReassignmentsRequest.data.topics).map { topics =>
-      topics.iterator().asScala.flatMap { topic =>
-        topic.partitionIndexes.iterator().asScala.map { partitionIndex =>
-          new TopicPartition(topic.name(), partitionIndex)
-        }
-      }.toSet
-    }
-
-    zkSupport.controller.listPartitionReassignments(partitionsOpt, 
sendResponseCallback)
-  }
-
   private def configsAuthorizationApiError(resource: ConfigResource): ApiError 
= {
     val error = resource.`type` match {
       case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => 
Errors.CLUSTER_AUTHORIZATION_FAILED
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0abeb3192ed..d8532c6cd8b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10288,13 +10288,6 @@ class KafkaApisTest extends Logging {
       response.data())
   }
 
-  @Test
-  def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest)
-  }
-
   @Test
   def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
     val request = buildRequest(new IncrementalAlterConfigsRequest(new 
IncrementalAlterConfigsRequestData(), 1.toShort))
@@ -10381,13 +10374,6 @@ class KafkaApisTest extends Logging {
     verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
   }
 
-  @Test
-  def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest)
-  }
-
   @Test
   def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")

Reply via email to