This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 408241adccc KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N):
CONTROLLED_SHUTDOWN and ENVELOPE (#18422)
408241adccc is described below
commit 408241adccc9046e53363b0c9346b92c2caa6fee
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Jan 10 04:12:12 2025 +0800
KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): CONTROLLED_SHUTDOWN and
ENVELOPE (#18422)
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 57 -------
.../scala/unit/kafka/server/KafkaApisTest.scala | 175 ---------------------
2 files changed, 232 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b11df5e16f7..f88ed3777a5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -90,7 +90,6 @@ import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
-import scala.util.{Failure, Success, Try}
/**
* Logic to handle the various Kafka requests
@@ -137,10 +136,6 @@ class KafkaApis(val requestChannel: RequestChannel,
info("Shutdown complete.")
}
- private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
- metadataSupport.forwardingManager.isDefined &&
request.context.principalSerde.isPresent
- }
-
private def maybeForwardToController(
request: RequestChannel.Request,
handler: RequestChannel.Request => Unit
@@ -196,7 +191,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
- case ApiKeys.CONTROLLED_SHUTDOWN =>
handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request,
requestLocal).exceptionally(handleError)
case ApiKeys.OFFSET_FETCH =>
handleOffsetFetchRequest(request).exceptionally(handleError)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
@@ -245,7 +239,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS =>
maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request,
handleUpdateFeatures)
- case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS =>
handleDescribeProducersRequest(request)
case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
@@ -291,27 +284,6 @@ class KafkaApis(val requestChannel: RequestChannel,
replicaManager.tryCompleteActions()
}
- def handleControlledShutdownRequest(request: RequestChannel.Request): Unit =
{
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
- // ensureTopicExists is only for client facing requests
- // We can't have the ensureTopicExists check here since the controller
sends it as an advisory to all brokers so they
- // stop serving data to clients for the topic being deleted
- val controlledShutdownRequest = request.body[ControlledShutdownRequest]
- authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-
- def controlledShutdownCallback(controlledShutdownResult:
Try[Set[TopicPartition]]): Unit = {
- val response = controlledShutdownResult match {
- case Success(partitionsRemaining) =>
- ControlledShutdownResponse.prepareResponse(Errors.NONE,
partitionsRemaining.asJava)
-
- case Failure(throwable) =>
- controlledShutdownRequest.getErrorResponse(throwable)
- }
- requestHelper.sendResponseExemptThrottle(request, response)
- }
-
zkSupport.controller.controlledShutdown(controlledShutdownRequest.data.brokerId,
controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback)
- }
-
/**
* Handle an offset commit request
*/
@@ -3349,35 +3321,6 @@ class KafkaApis(val requestChannel: RequestChannel,
new
DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
- def handleEnvelope(request: RequestChannel.Request, requestLocal:
RequestLocal): Unit = {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-
- // If forwarding is not yet enabled or this request has been received on
an invalid endpoint,
- // then we treat the request as unparsable and close the connection.
- if (!isForwardingEnabled(request)) {
- info(s"Closing connection ${request.context.connectionId} because it
sent an `Envelope` " +
- "request even though forwarding has not been enabled")
- requestChannel.closeConnection(request, Collections.emptyMap())
- return
- } else if (!request.context.fromPrivilegedListener) {
- info(s"Closing connection ${request.context.connectionId} from listener
${request.context.listenerName} " +
- s"because it sent an `Envelope` request, which is only accepted on the
inter-broker listener " +
- s"${config.interBrokerListenerName}.")
- requestChannel.closeConnection(request, Collections.emptyMap())
- return
- } else if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME)) {
- requestHelper.sendErrorResponseMaybeThrottle(request, new
ClusterAuthorizationException(
- s"Principal ${request.context.principal} does not have required
CLUSTER_ACTION for envelope"))
- return
- } else if (!zkSupport.controller.isActive) {
- requestHelper.sendErrorResponseMaybeThrottle(request, new
NotControllerException(
- s"Broker $brokerId is not the active controller"))
- return
- }
-
- EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics,
handle(_, requestLocal))
- }
-
def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
val describeProducersRequest = request.body[DescribeProducersRequest]
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6542749beeb..08a40e634a0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -322,167 +322,6 @@ class KafkaApisTest extends Logging {
assertEquals(propValue, describeConfigsResponseData.value)
}
- @Test
- def testEnvelopeRequestHandlingAsController(): Unit = {
- testEnvelopeRequestWithAlterConfig(
- alterConfigHandler = () => ApiError.NONE,
- expectedError = Errors.NONE
- )
- }
-
- @Test
- def testEnvelopeRequestWithAlterConfigUnhandledError(): Unit = {
- testEnvelopeRequestWithAlterConfig(
- alterConfigHandler = () => throw new IllegalStateException(),
- expectedError = Errors.UNKNOWN_SERVER_ERROR
- )
- }
-
- private def testEnvelopeRequestWithAlterConfig(
- alterConfigHandler: () => ApiError,
- expectedError: Errors
- ): Unit = {
- val authorizer: Authorizer = mock(classOf[Authorizer])
-
- authorizeResource(authorizer, AclOperation.CLUSTER_ACTION,
ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
-
- val operation = AclOperation.ALTER_CONFIGS
- val resourceName = "topic-1"
- val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS,
ApiKeys.ALTER_CONFIGS.latestVersion,
- clientId, 0)
-
- when(controller.isActive).thenReturn(true)
-
- authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName,
AuthorizationResult.ALLOWED)
-
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC,
resourceName)
- when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
- .thenAnswer(_ => {
- Map(configResource -> alterConfigHandler.apply())
- })
-
- val configs = Map(
- configResource -> new AlterConfigsRequest.Config(
- Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
- val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava,
false).build(requestHeader.apiVersion)
-
- val startTimeNanos = time.nanoseconds()
- val queueDurationNanos = 5 * 1000 * 1000
- val request = TestUtils.buildEnvelopeRequest(
- alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics,
startTimeNanos, startTimeNanos + queueDurationNanos)
-
- val capturedResponse: ArgumentCaptor[AlterConfigsResponse] =
ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
- val capturedRequest: ArgumentCaptor[RequestChannel.Request] =
ArgumentCaptor.forClass(classOf[RequestChannel.Request])
- kafkaApis = createKafkaApis(authorizer = Some(authorizer),
enableForwarding = true)
- kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
- verify(requestChannel).sendResponse(
- capturedRequest.capture(),
- capturedResponse.capture(),
- any()
- )
- assertEquals(Some(request), capturedRequest.getValue.envelope)
- // the dequeue time of forwarded request should equals to envelop request
- assertEquals(request.requestDequeueTimeNanos,
capturedRequest.getValue.requestDequeueTimeNanos)
- val innerResponse = capturedResponse.getValue
- val responseMap = innerResponse.data.responses().asScala.map {
resourceResponse =>
- resourceResponse.resourceName ->
Errors.forCode(resourceResponse.errorCode)
- }.toMap
-
- assertEquals(Map(resourceName -> expectedError), responseMap)
-
- verify(controller).isActive
- verify(adminManager).alterConfigs(any(), ArgumentMatchers.eq(false))
- }
-
- @Test
- def testInvalidEnvelopeRequestWithNonForwardableAPI(): Unit = {
- val requestHeader = new RequestHeader(ApiKeys.LEAVE_GROUP,
ApiKeys.LEAVE_GROUP.latestVersion,
- clientId, 0)
- val leaveGroupRequest = new LeaveGroupRequest.Builder("group",
- Collections.singletonList(new
MemberIdentity())).build(requestHeader.apiVersion)
-
- when(controller.isActive).thenReturn(true)
-
- val request = TestUtils.buildEnvelopeRequest(
- leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds())
-
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
- any[Long])).thenReturn(0)
- kafkaApis = createKafkaApis(enableForwarding = true)
- kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
- val response = verifyNoThrottling[EnvelopeResponse](request)
- assertEquals(Errors.INVALID_REQUEST, response.error())
- }
-
- @Test
- def testEnvelopeRequestWithNotFromPrivilegedListener(): Unit = {
- testInvalidEnvelopeRequest(Errors.NONE, fromPrivilegedListener = false,
- shouldCloseConnection = true)
- }
-
- @Test
- def testEnvelopeRequestNotAuthorized(): Unit = {
- testInvalidEnvelopeRequest(Errors.CLUSTER_AUTHORIZATION_FAILED,
- performAuthorize = true, authorizeResult = AuthorizationResult.DENIED)
- }
-
- @Test
- def testEnvelopeRequestNotControllerHandling(): Unit = {
- testInvalidEnvelopeRequest(Errors.NOT_CONTROLLER, performAuthorize = true,
isActiveController = false)
- }
-
- private def testInvalidEnvelopeRequest(expectedError: Errors,
- fromPrivilegedListener: Boolean =
true,
- shouldCloseConnection: Boolean =
false,
- performAuthorize: Boolean = false,
- authorizeResult: AuthorizationResult
= AuthorizationResult.ALLOWED,
- isActiveController: Boolean = true):
Unit = {
- val authorizer: Authorizer = mock(classOf[Authorizer])
-
- if (performAuthorize) {
- authorizeResource(authorizer, AclOperation.CLUSTER_ACTION,
ResourceType.CLUSTER, Resource.CLUSTER_NAME, authorizeResult)
- }
-
- val resourceName = "topic-1"
- val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS,
ApiKeys.ALTER_CONFIGS.latestVersion,
- clientId, 0)
-
- when(controller.isActive).thenReturn(isActiveController)
-
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC,
resourceName)
-
- val configs = Map(
- configResource -> new AlterConfigsRequest.Config(
- Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
- val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava,
false)
- .build(requestHeader.apiVersion)
-
- val request = TestUtils.buildEnvelopeRequest(
- alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener)
-
- val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
- kafkaApis = createKafkaApis(authorizer = Some(authorizer),
enableForwarding = true)
- kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
- if (shouldCloseConnection) {
- verify(requestChannel).closeConnection(
- ArgumentMatchers.eq(request),
- ArgumentMatchers.eq(java.util.Collections.emptyMap())
- )
- } else {
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None))
- val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
- assertEquals(expectedError, response.error)
- }
- if (performAuthorize) {
- verify(authorizer).authorize(any(), any())
- }
- }
-
@Test
def testAlterConfigsWithAuthorizer(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
@@ -10516,13 +10355,6 @@ class KafkaApisTest extends Logging {
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage,
e.getMessage)
}
- @Test
- def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
- kafkaApis = createKafkaApis(raftSupport = true)
-
verifyShouldNeverHandleErrorMessage(kafkaApis.handleControlledShutdownRequest)
- }
-
@Test
def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
@@ -10530,13 +10362,6 @@ class KafkaApisTest extends Logging {
verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
}
- @Test
- def testRaftShouldNeverHandleEnvelope(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
- kafkaApis = createKafkaApis(raftSupport = true)
- verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_,
RequestLocal.withThreadConfinedCaching))
- }
-
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)