This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 408241adccc KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): 
CONTROLLED_SHUTDOWN and ENVELOPE (#18422)
408241adccc is described below

commit 408241adccc9046e53363b0c9346b92c2caa6fee
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Jan 10 04:12:12 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): CONTROLLED_SHUTDOWN and 
ENVELOPE (#18422)
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  57 -------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 175 ---------------------
 2 files changed, 232 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b11df5e16f7..f88ed3777a5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -90,7 +90,6 @@ import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
-import scala.util.{Failure, Success, Try}
 
 /**
  * Logic to handle the various Kafka requests
@@ -137,10 +136,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
-  private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
-    metadataSupport.forwardingManager.isDefined && 
request.context.principalSerde.isPresent
-  }
-
   private def maybeForwardToController(
     request: RequestChannel.Request,
     handler: RequestChannel.Request => Unit
@@ -196,7 +191,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
-        case ApiKeys.CONTROLLED_SHUTDOWN => 
handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.OFFSET_FETCH => 
handleOffsetFetchRequest(request).exceptionally(handleError)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
@@ -245,7 +239,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
         case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
         case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, 
handleUpdateFeatures)
-        case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
         case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
@@ -291,27 +284,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     replicaManager.tryCompleteActions()
   }
 
-  def handleControlledShutdownRequest(request: RequestChannel.Request): Unit = 
{
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    // ensureTopicExists is only for client facing requests
-    // We can't have the ensureTopicExists check here since the controller 
sends it as an advisory to all brokers so they
-    // stop serving data to clients for the topic being deleted
-    val controlledShutdownRequest = request.body[ControlledShutdownRequest]
-    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-
-    def controlledShutdownCallback(controlledShutdownResult: 
Try[Set[TopicPartition]]): Unit = {
-      val response = controlledShutdownResult match {
-        case Success(partitionsRemaining) =>
-         ControlledShutdownResponse.prepareResponse(Errors.NONE, 
partitionsRemaining.asJava)
-
-        case Failure(throwable) =>
-          controlledShutdownRequest.getErrorResponse(throwable)
-      }
-      requestHelper.sendResponseExemptThrottle(request, response)
-    }
-    
zkSupport.controller.controlledShutdown(controlledShutdownRequest.data.brokerId,
 controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback)
-  }
-
   /**
    * Handle an offset commit request
    */
@@ -3349,35 +3321,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       new 
DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
-  def handleEnvelope(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-
-    // If forwarding is not yet enabled or this request has been received on 
an invalid endpoint,
-    // then we treat the request as unparsable and close the connection.
-    if (!isForwardingEnabled(request)) {
-      info(s"Closing connection ${request.context.connectionId} because it 
sent an `Envelope` " +
-        "request even though forwarding has not been enabled")
-      requestChannel.closeConnection(request, Collections.emptyMap())
-      return
-    } else if (!request.context.fromPrivilegedListener) {
-      info(s"Closing connection ${request.context.connectionId} from listener 
${request.context.listenerName} " +
-        s"because it sent an `Envelope` request, which is only accepted on the 
inter-broker listener " +
-        s"${config.interBrokerListenerName}.")
-      requestChannel.closeConnection(request, Collections.emptyMap())
-      return
-    } else if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-      requestHelper.sendErrorResponseMaybeThrottle(request, new 
ClusterAuthorizationException(
-        s"Principal ${request.context.principal} does not have required 
CLUSTER_ACTION for envelope"))
-      return
-    } else if (!zkSupport.controller.isActive) {
-      requestHelper.sendErrorResponseMaybeThrottle(request, new 
NotControllerException(
-        s"Broker $brokerId is not the active controller"))
-      return
-    }
-
-    EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, 
handle(_, requestLocal))
-  }
-
   def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
     val describeProducersRequest = request.body[DescribeProducersRequest]
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6542749beeb..08a40e634a0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -322,167 +322,6 @@ class KafkaApisTest extends Logging {
     assertEquals(propValue, describeConfigsResponseData.value)
   }
 
-  @Test
-  def testEnvelopeRequestHandlingAsController(): Unit = {
-    testEnvelopeRequestWithAlterConfig(
-      alterConfigHandler = () => ApiError.NONE,
-      expectedError = Errors.NONE
-    )
-  }
-
-  @Test
-  def testEnvelopeRequestWithAlterConfigUnhandledError(): Unit = {
-    testEnvelopeRequestWithAlterConfig(
-      alterConfigHandler = () => throw new IllegalStateException(),
-      expectedError = Errors.UNKNOWN_SERVER_ERROR
-    )
-  }
-
-  private def testEnvelopeRequestWithAlterConfig(
-    alterConfigHandler: () => ApiError,
-    expectedError: Errors
-  ): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
-    authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, 
ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
-
-    val operation = AclOperation.ALTER_CONFIGS
-    val resourceName = "topic-1"
-    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,
-      clientId, 0)
-
-    when(controller.isActive).thenReturn(true)
-
-    authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, 
AuthorizationResult.ALLOWED)
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
-    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
-      .thenAnswer(_ => {
-        Map(configResource -> alterConfigHandler.apply())
-      })
-
-    val configs = Map(
-      configResource -> new AlterConfigsRequest.Config(
-        Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
-    val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
-
-    val startTimeNanos = time.nanoseconds()
-    val queueDurationNanos = 5 * 1000 * 1000
-    val request = TestUtils.buildEnvelopeRequest(
-      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
startTimeNanos, startTimeNanos + queueDurationNanos)
-
-    val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = 
ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
-    val capturedRequest: ArgumentCaptor[RequestChannel.Request] = 
ArgumentCaptor.forClass(classOf[RequestChannel.Request])
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer), 
enableForwarding = true)
-    kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
-    verify(requestChannel).sendResponse(
-      capturedRequest.capture(),
-      capturedResponse.capture(),
-      any()
-    )
-    assertEquals(Some(request), capturedRequest.getValue.envelope)
-    // the dequeue time of forwarded request should equals to envelop request
-    assertEquals(request.requestDequeueTimeNanos, 
capturedRequest.getValue.requestDequeueTimeNanos)
-    val innerResponse = capturedResponse.getValue
-    val responseMap = innerResponse.data.responses().asScala.map { 
resourceResponse =>
-      resourceResponse.resourceName -> 
Errors.forCode(resourceResponse.errorCode)
-    }.toMap
-
-    assertEquals(Map(resourceName -> expectedError), responseMap)
-
-    verify(controller).isActive
-    verify(adminManager).alterConfigs(any(), ArgumentMatchers.eq(false))
-  }
-
-  @Test
-  def testInvalidEnvelopeRequestWithNonForwardableAPI(): Unit = {
-    val requestHeader = new RequestHeader(ApiKeys.LEAVE_GROUP, 
ApiKeys.LEAVE_GROUP.latestVersion,
-      clientId, 0)
-    val leaveGroupRequest = new LeaveGroupRequest.Builder("group",
-      Collections.singletonList(new 
MemberIdentity())).build(requestHeader.apiVersion)
-
-    when(controller.isActive).thenReturn(true)
-
-    val request = TestUtils.buildEnvelopeRequest(
-      leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(enableForwarding = true)
-    kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
-    val response = verifyNoThrottling[EnvelopeResponse](request)
-    assertEquals(Errors.INVALID_REQUEST, response.error())
-  }
-
-  @Test
-  def testEnvelopeRequestWithNotFromPrivilegedListener(): Unit = {
-    testInvalidEnvelopeRequest(Errors.NONE, fromPrivilegedListener = false,
-      shouldCloseConnection = true)
-  }
-
-  @Test
-  def testEnvelopeRequestNotAuthorized(): Unit = {
-    testInvalidEnvelopeRequest(Errors.CLUSTER_AUTHORIZATION_FAILED,
-      performAuthorize = true, authorizeResult = AuthorizationResult.DENIED)
-  }
-
-  @Test
-  def testEnvelopeRequestNotControllerHandling(): Unit = {
-    testInvalidEnvelopeRequest(Errors.NOT_CONTROLLER, performAuthorize = true, 
isActiveController = false)
-  }
-
-  private def testInvalidEnvelopeRequest(expectedError: Errors,
-                                         fromPrivilegedListener: Boolean = 
true,
-                                         shouldCloseConnection: Boolean = 
false,
-                                         performAuthorize: Boolean = false,
-                                         authorizeResult: AuthorizationResult 
= AuthorizationResult.ALLOWED,
-                                         isActiveController: Boolean = true): 
Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
-    if (performAuthorize) {
-      authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, 
ResourceType.CLUSTER, Resource.CLUSTER_NAME, authorizeResult)
-    }
-
-    val resourceName = "topic-1"
-    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,
-      clientId, 0)
-
-    when(controller.isActive).thenReturn(isActiveController)
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
-
-    val configs = Map(
-      configResource -> new AlterConfigsRequest.Config(
-        Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
-    val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
-      .build(requestHeader.apiVersion)
-
-    val request = TestUtils.buildEnvelopeRequest(
-      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener)
-
-    val capturedResponse: ArgumentCaptor[AbstractResponse] = 
ArgumentCaptor.forClass(classOf[AbstractResponse])
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer), 
enableForwarding = true)
-    kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
-    if (shouldCloseConnection) {
-      verify(requestChannel).closeConnection(
-        ArgumentMatchers.eq(request),
-        ArgumentMatchers.eq(java.util.Collections.emptyMap())
-      )
-    } else {
-      verify(requestChannel).sendResponse(
-        ArgumentMatchers.eq(request),
-        capturedResponse.capture(),
-        ArgumentMatchers.eq(None))
-      val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
-      assertEquals(expectedError, response.error)
-    }
-    if (performAuthorize) {
-      verify(authorizer).authorize(any(), any())
-    }
-  }
-
   @Test
   def testAlterConfigsWithAuthorizer(): Unit = {
     val authorizer: Authorizer = mock(classOf[Authorizer])
@@ -10516,13 +10355,6 @@ class KafkaApisTest extends Logging {
     assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, 
e.getMessage)
   }
 
-  @Test
-  def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldNeverHandleErrorMessage(kafkaApis.handleControlledShutdownRequest)
-  }
-
   @Test
   def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
@@ -10530,13 +10362,6 @@ class KafkaApisTest extends Logging {
     verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
   }
 
-  @Test
-  def testRaftShouldNeverHandleEnvelope(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_, 
RequestLocal.withThreadConfinedCaching))
-  }
-
   @Test
   def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)

Reply via email to