This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c7654f77312 KAFKA-18399 Remove ZooKeeper from KafkaApis (8/N): 
ELECT_LEADERS , ALTER_PARTITION, UPDATE_FEATURES (#18453)
c7654f77312 is described below

commit c7654f773121f7c9b5c5ca7ecc019ff30a2d3771
Author: TaiJuWu <tjwu1...@gmail.com>
AuthorDate: Wed Jan 15 04:53:03 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (8/N): ELECT_LEADERS , 
ALTER_PARTITION, UPDATE_FEATURES (#18453)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 118 ---------------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  27 -----
 2 files changed, 145 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 085df39c886..136abf1f425 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.internals.{FatalExitError, 
Topic}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
 AddPartitionsToTxnResultCollection}
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
 DeleteRecordsTopicResult}
-import 
org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, 
ReplicaElectionResult}
 import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
 ListOffsetsTopicResponse}
@@ -217,7 +216,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request)
         case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => 
handleDescribeUserScramCredentialsRequest(request)
         case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
forwardToController(request)
-        case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
         case ApiKeys.UPDATE_FEATURES => forwardToController(request)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
@@ -2399,77 +2397,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       true
   }
 
-  def handleElectLeaders(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val electionRequest = request.body[ElectLeadersRequest]
-
-    def sendResponseCallback(
-      error: ApiError
-    )(
-      results: Map[TopicPartition, ApiError]
-    ): Unit = {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-        val adjustedResults = if (electionRequest.data.topicPartitions == 
null) {
-          /* When performing elections across all of the partitions we should 
only return
-           * partitions for which there was an election or resulted in an 
error. In other
-           * words, partitions that didn't need election because they ready 
have the correct
-           * leader are not returned to the client.
-           */
-          results.filter { case (_, error) =>
-            error.error != Errors.ELECTION_NOT_NEEDED
-          }
-        } else results
-
-        val electionResults = new util.ArrayList[ReplicaElectionResult]()
-        adjustedResults
-          .groupBy { case (tp, _) => tp.topic }
-          .foreachEntry { (topic, ps) =>
-            val electionResult = new ReplicaElectionResult()
-
-            electionResult.setTopic(topic)
-            ps.foreachEntry { (topicPartition, error) =>
-              val partitionResult = new PartitionResult()
-              partitionResult.setPartitionId(topicPartition.partition)
-              partitionResult.setErrorCode(error.error.code)
-              partitionResult.setErrorMessage(error.message)
-              electionResult.partitionResult.add(partitionResult)
-            }
-
-            electionResults.add(electionResult)
-          }
-
-        new ElectLeadersResponse(
-          requestThrottleMs,
-          error.error.code,
-          electionResults,
-          electionRequest.version
-        )
-      })
-    }
-
-    if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
-      val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
-      val partitionErrors: Map[TopicPartition, ApiError] =
-        electionRequest.topicPartitions.asScala.iterator.map(partition => 
partition -> error).toMap
-
-      sendResponseCallback(error)(partitionErrors)
-    } else {
-      val partitions = if (electionRequest.data.topicPartitions == null) {
-        metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions)
-      } else {
-        electionRequest.topicPartitions.asScala
-      }
-
-      replicaManager.electLeaders(
-        zkSupport.controller,
-        partitions,
-        electionRequest.electionType,
-        sendResponseCallback(ApiError.NONE),
-        electionRequest.data.timeoutMs
-      )
-    }
-  }
-
   def handleOffsetDeleteRequest(
     request: RequestChannel.Request,
     requestLocal: RequestLocal
@@ -2628,51 +2555,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    val alterPartitionRequest = request.body[AlterPartitionRequest]
-    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-
-    if (!zkSupport.controller.isActive)
-      requestHelper.sendResponseExemptThrottle(request, 
alterPartitionRequest.getErrorResponse(
-        AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.NOT_CONTROLLER.exception))
-    else
-      zkSupport.controller.alterPartitions(alterPartitionRequest.data, 
request.context.apiVersion, alterPartitionResp =>
-        requestHelper.sendResponseExemptThrottle(request, new 
AlterPartitionResponse(alterPartitionResp)))
-  }
-
-  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
-
-    def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
-      def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
-        errors match {
-          case Left(topLevelError) =>
-            UpdateFeaturesResponse.createWithErrors(
-              topLevelError,
-              Collections.emptySet(),
-              throttleTimeMs)
-          case Right(featureUpdateErrors) =>
-            // This response is not correct, but since this is ZK specific 
code it will be removed in 4.0
-            UpdateFeaturesResponse.createWithErrors(
-              ApiError.NONE,
-              featureUpdateErrors.asJava.keySet(),
-              throttleTimeMs)
-        }
-      }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createResponse(requestThrottleMs))
-    }
-
-    if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
-      sendResponseCallback(Left(new 
ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED)))
-    } else if (!zkSupport.controller.isActive) {
-      sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER)))
-    } else {
-      zkSupport.controller.updateFeatures(updateFeaturesRequest, 
sendResponseCallback)
-    }
-  }
-
   def handleDescribeCluster(request: RequestChannel.Request): Unit = {
     val response = authHelper.computeDescribeClusterResponse(
       request,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cdc77a8213e..8b91603dbc8 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9932,25 +9932,12 @@ class KafkaApisTest extends Logging {
     request
   }
 
-  private def verifyShouldNeverHandleErrorMessage(handler: 
RequestChannel.Request => Unit): Unit = {
-    val request = createMockRequest()
-    val e = assertThrows(classOf[UnsupportedVersionException], () => 
handler(request))
-    assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
-  }
-
   private def verifyShouldAlwaysForwardErrorMessage(handler: 
RequestChannel.Request => Unit): Unit = {
     val request = createMockRequest()
     val e = assertThrows(classOf[UnsupportedVersionException], () => 
handler(request))
     assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, 
e.getMessage)
   }
 
-  @Test
-  def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
-  }
-
   @Test
   def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
@@ -10048,20 +10035,6 @@ class KafkaApisTest extends Logging {
     
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
   }
 
-  @Test
-  def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures)
-  }
-
-  @Test
-  def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
-  }
-
   @Test
   def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")

Reply via email to