This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 927008e5628 KAFKA-18399 Remove ZooKeeper from KafkaApis (3/N): USER_SCRAM_CREDENTIALS (#18456) 927008e5628 is described below commit 927008e56288467cb25b545110c1ec7ad56392f3 Author: TengYao Chi <kiting...@gmail.com> AuthorDate: Sun Jan 12 20:39:49 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (3/N): USER_SCRAM_CREDENTIALS (#18456) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 79 ++++------- .../main/scala/kafka/server/MetadataSupport.scala | 9 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 153 +-------------------- 3 files changed, 26 insertions(+), 215 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d80a3803337..88e032e7cc8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -135,17 +135,15 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } - private def maybeForwardToController( - request: RequestChannel.Request, - handler: RequestChannel.Request => Unit - ): Unit = { + private def forwardToController(request: RequestChannel.Request): Unit = { def responseCallback(responseOpt: Option[AbstractResponse]): Unit = { responseOpt match { case Some(response) => requestHelper.sendForwardedResponse(request, response) case None => handleInvalidVersionsDuringForwarding(request) } } - metadataSupport.maybeForward(request, handler, responseCallback) + + metadataSupport.forward(request, responseCallback) } private def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit = { @@ -155,16 +153,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.closeConnection(request, Collections.emptyMap()) } - private def forwardToControllerOrFail( - request: RequestChannel.Request - ): Unit = { - def errorHandler(request: RequestChannel.Request): Unit = { - throw new IllegalStateException(s"Unable to forward $request to the controller") - } - - maybeForwardToController(request, errorHandler) - } - /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -201,8 +189,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) - case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest) - case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest) + case ApiKeys.CREATE_TOPICS => forwardToController(request) + case ApiKeys.DELETE_TOPICS => forwardToController(request) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request) @@ -212,14 +200,14 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) - case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls) - case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls) + case ApiKeys.CREATE_ACLS => forwardToController(request) + case ApiKeys.DELETE_ACLS => forwardToController(request) case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request) case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request) case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) - case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest) + case ApiKeys.CREATE_PARTITIONS => forwardToController(request) // Create, renew and expire DelegationTokens must first validate that the connection // itself is not authenticated with a delegation token before maybeForwardToController. case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request) @@ -227,32 +215,32 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError) - case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders) + case ApiKeys.ELECT_LEADERS => forwardToController(request) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request) - case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest) - case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest) + case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => forwardToController(request) + case ApiKeys.LIST_PARTITION_REASSIGNMENTS => forwardToController(request) case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request) - case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest) + case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request) case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request) - case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest) + case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => forwardToController(request) case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request) - case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures) + case ApiKeys.UPDATE_FEATURES => forwardToController(request) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) - case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request) + case ApiKeys.UNREGISTER_BROKER => forwardToController(request) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) - case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) + case ApiKeys.DESCRIBE_QUORUM => forwardToController(request) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request) case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request) case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request) case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request) - case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request) - case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request) + case ApiKeys.ADD_RAFT_VOTER => forwardToController(request) + case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request) case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError) case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request) @@ -2771,7 +2759,7 @@ class KafkaApis(val requestChannel: RequestChannel, CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, owner, requester)) } else { - maybeForwardToController(request, handleCreateTokenRequestZk) + forwardToController(request) } } @@ -2824,7 +2812,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) } else { - maybeForwardToController(request, handleRenewTokenRequestZk) + forwardToController(request) } } @@ -2870,7 +2858,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) } else { - maybeForwardToController(request, handleExpireTokenRequestZk) + forwardToController(request) } } @@ -3173,37 +3161,16 @@ class KafkaApis(val requestChannel: RequestChannel, describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { metadataSupport match { - case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) => - val result = adminManager.describeUserScramCredentials( - Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs))) case RaftSupport(_, metadataCache) => val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data()) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs))) + case _ => + throw KafkaApis.shouldNeverReceive(request) } } } - def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val alterUserScramCredentialsRequest = request.body[AlterUserScramCredentialsRequest] - - if (!zkSupport.controller.isActive) { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception)) - } else if (authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { - val result = zkSupport.adminManager.alterUserScramCredentials( - alterUserScramCredentialsRequest.data.upsertions().asScala, alterUserScramCredentialsRequest.data.deletions().asScala) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs))) - } else { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) - } - } - def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = { val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) val alterPartitionRequest = request.body[AlterPartitionRequest] diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala index 335df7c42d7..83a52e83f69 100644 --- a/core/src/main/scala/kafka/server/MetadataSupport.scala +++ b/core/src/main/scala/kafka/server/MetadataSupport.scala @@ -58,16 +58,11 @@ sealed trait MetadataSupport { def canForward(): Boolean - def maybeForward( + def forward( request: RequestChannel.Request, - handler: RequestChannel.Request => Unit, responseCallback: Option[AbstractResponse] => Unit ): Unit = { - if (!request.isForwarded && canForward()) { - forwardingManager.get.forwardRequest(request, responseCallback) - } else { - handler(request) - } + forwardingManager.get.forwardRequest(request, responseCallback) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index e65d4011fb0..0abeb3192ed 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -43,8 +43,7 @@ import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsReso import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse} import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup -import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic -import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse} @@ -598,14 +597,6 @@ class KafkaApisTest extends Logging { ) } - private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = { - kafkaApis = createKafkaApis(enableForwarding = true) - testForwardableApi(kafkaApis = kafkaApis, - apiKey, - requestBuilder - ) - } - private def testForwardableApi( kafkaApis: KafkaApis, apiKey: ApiKeys, @@ -769,12 +760,6 @@ class KafkaApisTest extends Logging { verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong) } - @Test - def testAlterClientQuotasWithForwarding(): Unit = { - val requestBuilder = new AlterClientQuotasRequest.Builder(List.empty.asJava, false) - testForwardableApi(ApiKeys.ALTER_CLIENT_QUOTAS, requestBuilder) - } - private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse, expected: Map[ClientQuotaEntity, Errors]): Unit = { val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap @@ -858,16 +843,6 @@ class KafkaApisTest extends Logging { Topic.CLUSTER_METADATA_TOPIC_NAME -> Errors.NONE)) } - @Test - def testCreateTopicsWithForwarding(): Unit = { - val requestBuilder = new CreateTopicsRequest.Builder( - new CreateTopicsRequestData().setTopics( - new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("topic").setNumPartitions(1). - setReplicationFactor(1.toShort)).iterator()))) - testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder) - } - @ParameterizedTest @CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) def testKRaftControllerThrottleTimeEnforced( @@ -919,65 +894,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedThrottleTimeMs, responseData.throttleTimeMs) } - @Test - def testCreatePartitionsAuthorization(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - - val timeoutMs = 35000 - val requestData = new CreatePartitionsRequestData() - .setTimeoutMs(timeoutMs) - .setValidateOnly(false) - val fooCreatePartitionsData = new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2) - val barCreatePartitionsData = new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10) - requestData.topics().add(fooCreatePartitionsData) - requestData.topics().add(barCreatePartitionsData) - - val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL) - val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true) - - val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL) - val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true) - - when(authorizer.authorize( - any[RequestContext](), - any[util.List[Action]]() - )).thenAnswer { invocation => - val actions = invocation.getArgument[util.List[Action]](1).asScala - val results = actions.map { action => - if (action == fooAction) AuthorizationResult.ALLOWED - else if (action == barAction) AuthorizationResult.DENIED - else throw new AssertionError(s"Unexpected action $action") - } - new util.ArrayList[AuthorizationResult](results.asJava) - } - - val request = buildRequest(new CreatePartitionsRequest.Builder(requestData).build()) - - when(controller.isActive).thenReturn(true) - when(controller.isTopicQueuedForDeletion("foo")).thenReturn(false) - when(clientControllerQuotaManager.newQuotaFor( - ArgumentMatchers.eq(request), ArgumentMatchers.anyShort()) - ).thenReturn(UnboundedControllerMutationQuota) - when(adminManager.createPartitions( - timeoutMs = ArgumentMatchers.eq(timeoutMs), - newPartitions = ArgumentMatchers.eq(Seq(fooCreatePartitionsData)), - validateOnly = ArgumentMatchers.eq(false), - controllerMutationQuota = ArgumentMatchers.eq(UnboundedControllerMutationQuota), - callback = ArgumentMatchers.any[Map[String, ApiError] => Unit]() - )).thenAnswer { invocation => - val callback = invocation.getArgument[Map[String, ApiError] => Unit](4) - callback.apply(Map("foo" -> ApiError.NONE)) - } - - kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[CreatePartitionsResponse](request) - val results = response.data.results.asScala - assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode))) - assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode))) - } - private def createTopicAuthorization(authorizer: Authorizer, operation: AclOperation, authorizedTopic: String, @@ -1033,66 +949,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedTopicConfigErrorCodes, actualTopicConfigErrorCodes) } - @Test - def testCreateAclWithForwarding(): Unit = { - val requestBuilder = new CreateAclsRequest.Builder(new CreateAclsRequestData()) - testForwardableApi(ApiKeys.CREATE_ACLS, requestBuilder) - } - - @Test - def testDeleteAclWithForwarding(): Unit = { - val requestBuilder = new DeleteAclsRequest.Builder(new DeleteAclsRequestData()) - testForwardableApi(ApiKeys.DELETE_ACLS, requestBuilder) - } - - @Test - def testCreateDelegationTokenWithForwarding(): Unit = { - val requestBuilder = new CreateDelegationTokenRequest.Builder(new CreateDelegationTokenRequestData()) - testForwardableApi(ApiKeys.CREATE_DELEGATION_TOKEN, requestBuilder) - } - - @Test - def testRenewDelegationTokenWithForwarding(): Unit = { - val requestBuilder = new RenewDelegationTokenRequest.Builder(new RenewDelegationTokenRequestData()) - testForwardableApi(ApiKeys.RENEW_DELEGATION_TOKEN, requestBuilder) - } - - @Test - def testExpireDelegationTokenWithForwarding(): Unit = { - val requestBuilder = new ExpireDelegationTokenRequest.Builder(new ExpireDelegationTokenRequestData()) - testForwardableApi(ApiKeys.EXPIRE_DELEGATION_TOKEN, requestBuilder) - } - - @Test - def testAlterPartitionReassignmentsWithForwarding(): Unit = { - val requestBuilder = new AlterPartitionReassignmentsRequest.Builder(new AlterPartitionReassignmentsRequestData()) - testForwardableApi(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, requestBuilder) - } - - @Test - def testCreatePartitionsWithForwarding(): Unit = { - val requestBuilder = new CreatePartitionsRequest.Builder(new CreatePartitionsRequestData()) - testForwardableApi(ApiKeys.CREATE_PARTITIONS, requestBuilder) - } - - @Test - def testUpdateFeaturesWithForwarding(): Unit = { - val requestBuilder = new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData()) - testForwardableApi(ApiKeys.UPDATE_FEATURES, requestBuilder) - } - - @Test - def testDeleteTopicsWithForwarding(): Unit = { - val requestBuilder = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()) - testForwardableApi(ApiKeys.DELETE_TOPICS, requestBuilder) - } - - @Test - def testAlterScramWithForwarding(): Unit = { - val requestBuilder = new AlterUserScramCredentialsRequest.Builder(new AlterUserScramCredentialsRequestData()) - testForwardableApi(ApiKeys.ALTER_USER_SCRAM_CREDENTIALS, requestBuilder) - } - @Test def testFindCoordinatorAutoTopicCreationForOffsetTopic(): Unit = { testFindCoordinatorWithTopicCreation(CoordinatorType.GROUP) @@ -10511,13 +10367,6 @@ class KafkaApisTest extends Logging { verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest) } - @Test - def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest) - } - @Test def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)