This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new df5272a27e5 KAFKA-18399 Remove ZooKeeper from KafkaApis (12/N): clean up ZKMetadataCache, KafkaController and raftSupport (#18542) df5272a27e5 is described below commit df5272a27e5708ad783f11c58ce946cddeff57d2 Author: TaiJuWu <tjwu1...@gmail.com> AuthorDate: Wed Jan 15 23:28:57 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (12/N): clean up ZKMetadataCache, KafkaController and raftSupport (#18542) Reviewers: Viktor Somogyi-Vass <viktorsomo...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../scala/unit/kafka/server/KafkaApisTest.scala | 403 +++++---------------- 1 file changed, 92 insertions(+), 311 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a19e1a4296c..d778c785982 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -17,16 +17,14 @@ package kafka.server -import kafka.cluster.{Broker, Partition} -import kafka.controller.KafkaController +import kafka.cluster.Partition import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.UnifiedLog import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache} +import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository} import kafka.server.share.SharePartitionManager import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils} -import kafka.zk.KafkaZkClient import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common._ @@ -82,7 +80,7 @@ import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion} -import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData} import org.apache.kafka.server.quota.ThrottleCallback @@ -119,9 +117,7 @@ class KafkaApisTest extends Logging { private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator]) private val shareCoordinator: ShareCoordinator = mock(classOf[ShareCoordinator]) - private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager]) private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator]) - private val controller: KafkaController = mock(classOf[KafkaController]) private val forwardingManager: ForwardingManager = mock(classOf[ForwardingManager]) private val autoTopicCreationManager: AutoTopicCreationManager = mock(classOf[AutoTopicCreationManager]) @@ -129,12 +125,9 @@ class KafkaApisTest extends Logging { override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString) override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) } - private val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) private val metrics = new Metrics() private val brokerId = 1 - // KRaft tests should override this with a KRaftMetadataCache - private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting()) - private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) + private var metadataCache: MetadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager]) private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager]) private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager]) @@ -162,59 +155,37 @@ class KafkaApisTest extends Logging { def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting, authorizer: Option[Authorizer] = None, - enableForwarding: Boolean = false, configRepository: ConfigRepository = new MockConfigRepository(), - raftSupport: Boolean = false, overrideProperties: Map[String, String] = Map.empty, featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = { - val properties = if (raftSupport) { - val properties = TestUtils.createBrokerConfig(brokerId) - properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString) - properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") - val voterId = brokerId + 1 - properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093") - properties - } else { - TestUtils.createBrokerConfig(brokerId) - } + + val properties = TestUtils.createBrokerConfig(brokerId) + properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString) + properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + val voterId = brokerId + 1 + properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093") + overrideProperties.foreach( p => properties.put(p._1, p._2)) TestUtils.setIbpVersion(properties, interBrokerProtocolVersion) val config = new KafkaConfig(properties) - val forwardingManagerOpt = if (enableForwarding) - Some(this.forwardingManager) - else - None - - val metadataSupport = if (raftSupport) { - // it will be up to the test to replace the default ZkMetadataCache implementation - // with a KRaftMetadataCache instance - metadataCache match { + val metadataSupport = metadataCache match { case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache) case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache") } - } else { - metadataCache match { - case zkMetadataCache: ZkMetadataCache => - ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache, brokerEpochManager) - case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache") - } - } - val listenerType = if (raftSupport) ListenerType.BROKER else ListenerType.ZK_BROKER - val enabledApis = if (enableForwarding) { - ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE) - } else { - ApiKeys.apisForListener(listenerType).asScala.toSet - } + + val listenerType = ListenerType.BROKER + val enabledApis = ApiKeys.apisForListener(listenerType).asScala + val apiVersionManager = new SimpleApiVersionManager( listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(true), true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport)) + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) - val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None + val clientMetricsManagerOpt = Some(clientMetricsManager) when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled) setupFeatures(featureVersions) @@ -290,7 +261,7 @@ class KafkaApisTest extends Logging { topicConfigs.put(propName, propValue) when(configRepository.topicConfig(resourceName)).thenReturn(topicConfigs) - metadataCache = mock(classOf[ZkMetadataCache]) + metadataCache = mock(classOf[KRaftMetadataCache]) when(metadataCache.contains(resourceName)).thenReturn(true) val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() @@ -299,10 +270,8 @@ class KafkaApisTest extends Logging { .setResourceName(resourceName) .setResourceType(ConfigResource.Type.TOPIC.id)).asJava)) .build(requestHeader.apiVersion) - val request = buildRequest(describeConfigsRequest, - requestHeader = Option(requestHeader)) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) + val request = buildRequest(describeConfigsRequest, requestHeader = Option(requestHeader)) + kafkaApis = createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository) kafkaApis.handleDescribeConfigsRequest(request) @@ -344,7 +313,7 @@ class KafkaApisTest extends Logging { val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader)) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) - createKafkaApis(authorizer = Some(authorizer), raftSupport = true).handleIncrementalAlterConfigsRequest(request) + createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request) verify(forwardingManager, times(1)).forwardRequest( any(), any(), @@ -422,7 +391,7 @@ class KafkaApisTest extends Logging { val request = buildRequest(apiRequest) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handleAlterConfigsRequest(request) verify(forwardingManager, times(1)).forwardRequest( any(), @@ -444,7 +413,7 @@ class KafkaApisTest extends Logging { val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader)) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handleIncrementalAlterConfigsRequest(request) verify(forwardingManager, times(1)).forwardRequest( any(), @@ -484,7 +453,7 @@ class KafkaApisTest extends Logging { val cmConfigs = ClientMetricsTestUtils.defaultProperties when(configRepository.config(resource)).thenReturn(cmConfigs) - metadataCache = mock(classOf[ZkMetadataCache]) + metadataCache = mock(classOf[KRaftMetadataCache]) when(metadataCache.contains(subscriptionName)).thenReturn(true) val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() @@ -495,8 +464,7 @@ class KafkaApisTest extends Logging { .build(requestHeader.apiVersion) val request = buildRequest(describeConfigsRequest, requestHeader = Option(requestHeader)) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) + kafkaApis = createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository) kafkaApis.handleDescribeConfigsRequest(request) @@ -518,7 +486,7 @@ class KafkaApisTest extends Logging { val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition) val requestBuilder = new DescribeQuorumRequest.Builder(requestData) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() testForwardableApi(kafkaApis = kafkaApis, ApiKeys.DESCRIBE_QUORUM, requestBuilder @@ -530,7 +498,7 @@ class KafkaApisTest extends Logging { requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest] ): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true) + kafkaApis = createKafkaApis() testForwardableApi(kafkaApis = kafkaApis, apiKey, requestBuilder @@ -548,13 +516,6 @@ class KafkaApisTest extends Logging { val apiRequest = requestBuilder.build(topicHeader.apiVersion) val request = buildRequest(apiRequest) - if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) { - // The controller check only makes sense for ZK clusters. For KRaft, - // controller requests are handled on a separate listener, so there - // is no choice but to forward them. - when(controller.isActive).thenReturn(false) - } - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit]) @@ -573,9 +534,6 @@ class KafkaApisTest extends Logging { val capturedResponse = verifyNoThrottling[AbstractResponse](request) assertEquals(expectedResponse.data, capturedResponse.data) - if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) { - verify(controller).isActive - } } private def authorizeResource(authorizer: Authorizer, @@ -612,7 +570,7 @@ class KafkaApisTest extends Logging { val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader)) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION) - kafkaApis = createKafkaApis(authorizer = Some(authorizer), raftSupport = true) + kafkaApis = createKafkaApis(authorizer = Some(authorizer)) kafkaApis.handleIncrementalAlterConfigsRequest(request) verify(authorizer, times(1)).authorize(any(), any()) @@ -652,7 +610,7 @@ class KafkaApisTest extends Logging { val requestBuilder = new CreateTopicsRequest.Builder(requestData).build() val request = buildRequest(requestBuilder) - kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true) + kafkaApis = createKafkaApis() val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit]) @@ -904,8 +862,7 @@ class KafkaApisTest extends Logging { any[Long])).thenReturn(0) val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request) - kafkaApis = createKafkaApis(authorizer = Some(authorizer), enableForwarding = enableAutoTopicCreation, - overrideProperties = topicConfigOverride) + kafkaApis = createKafkaApis(authorizer = Some(authorizer), overrideProperties = topicConfigOverride) kafkaApis.handleTopicMetadataRequest(request) val response = verifyNoThrottling[MetadataResponse](request) @@ -2242,7 +2199,7 @@ class KafkaApisTest extends Logging { @Test def testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower(): Unit = { val topic = "topic" - metadataCache = mock(classOf[ZkMetadataCache]) + metadataCache = mock(classOf[KRaftMetadataCache]) for (version <- 10 to ApiKeys.PRODUCE.latestVersion) { @@ -3612,175 +3569,6 @@ class KafkaApisTest extends Logging { assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet) } - - /** - * Metadata request to fetch all topics should not result in the followings: - * 1) Auto topic creation - * 2) UNKNOWN_TOPIC_OR_PARTITION - * - * This case is testing the case that a topic is being deleted from MetadataCache right after - * authorization but before checking in MetadataCache. - */ - @Test - def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = { - // Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache - metadataCache = mock(classOf[ZkMetadataCache]) - when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0))) - when(metadataCache.getControllerId).thenReturn(None) - - // 2 topics returned for authorization in during handle - val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic") - when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization) - // 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call - when(metadataCache.getTopicMetadata( - ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization), - any[ListenerName], - anyBoolean, - anyBoolean - )).thenReturn(Seq( - new MetadataResponseTopic() - .setErrorCode(Errors.NONE.code) - .setName("remaining-topic") - .setIsInternal(false) - )) - - - var createTopicIsCalled: Boolean = false - // Specific mock on zkClient for this use case - // Expect it's never called to do auto topic creation - when(zkClient.setOrCreateEntityConfigs( - ArgumentMatchers.eq(ConfigType.TOPIC), - anyString, - any[Properties] - )).thenAnswer(_ => { - createTopicIsCalled = true - }) - // No need to use - when(zkClient.getAllBrokersInCluster) - .thenReturn(Seq(new Broker( - brokerId, "localhost", 9902, - ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT - ))) - - - val (requestListener, _) = updateMetadataCacheWithInconsistentListeners() - val response = sendMetadataRequestWithInconsistentListeners(requestListener) - - assertFalse(createTopicIsCalled) - val responseTopics = response.topicMetadata().asScala.map { metadata => metadata.topic() } - assertEquals(List("remaining-topic"), responseTopics) - assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty) - } - - @Test - def testUnauthorizedTopicMetadataRequest(): Unit = { - // 1. Set up broker information - val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) - val broker = new UpdateMetadataBroker() - .setId(0) - .setRack("rack") - .setEndpoints(Seq( - new UpdateMetadataEndpoint() - .setHost("broker0") - .setPort(9092) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) - .setListener(plaintextListener.value) - ).asJava) - - // 2. Set up authorizer - val authorizer: Authorizer = mock(classOf[Authorizer]) - val unauthorizedTopic = "unauthorized-topic" - val authorizedTopic = "authorized-topic" - - val expectedActions = Seq( - new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true), - new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true) - ) - - when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava)))) - .thenAnswer { invocation => - val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala - actions.map { action => - if (action.resourcePattern().name().equals(authorizedTopic)) - AuthorizationResult.ALLOWED - else - AuthorizationResult.DENIED - }.asJava - } - - // 3. Set up MetadataCache - val authorizedTopicId = Uuid.randomUuid() - val unauthorizedTopicId = Uuid.randomUuid() - - val topicIds = new util.HashMap[String, Uuid]() - topicIds.put(authorizedTopic, authorizedTopicId) - topicIds.put(unauthorizedTopic, unauthorizedTopicId) - - def createDummyPartitionStates(topic: String) = { - new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setReplicas(Collections.singletonList(0)) - .setZkVersion(0) - .setIsr(Collections.singletonList(0)) - } - - // Send UpdateMetadataReq to update MetadataCache - val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates) - - val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, - 0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build() - metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId = 0, updateMetadataRequest) - - // 4. Send TopicMetadataReq using topicId - val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build() - val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleTopicMetadataRequest(repByTopicId) - val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId) - - val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head)) - - metadataByTopicId.foreach { case (topicId, metadataResponseTopic) => - if (topicId == unauthorizedTopicId) { - // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id - assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode()) - // Do not return topic information on unauthorized error - assertNull(metadataResponseTopic.name()) - } else { - assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode()) - assertEquals(authorizedTopic, metadataResponseTopic.name()) - } - } - kafkaApis.close() - - // 4. Send TopicMetadataReq using topic name - reset(clientRequestQuotaManager, requestChannel) - val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build() - val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleTopicMetadataRequest(repByTopicName) - val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName) - - val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head)) - - metadataByTopicName.foreach { case (topicName, metadataResponseTopic) => - if (topicName == unauthorizedTopic) { - assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode()) - // Do not return topic Id on unauthorized error - assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId()) - } else { - assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode()) - assertEquals(authorizedTopicId, metadataResponseTopic.topicId()) - } - } - } - /** * Verifies that sending a fetch request with version 9 works correctly when * ReplicaManager.getLogConfig returns None. @@ -4023,7 +3811,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) val responseData = response.data() @@ -4106,7 +3894,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -4209,7 +3997,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -4292,7 +4080,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) val responseData = response.data() @@ -4369,7 +4157,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) val responseData = response.data() @@ -4433,7 +4221,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) val responseData = response.data() @@ -4489,7 +4277,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) val responseData = response.data() @@ -4560,7 +4348,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -4653,7 +4441,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -4800,7 +4588,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -5133,7 +4921,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -5498,7 +5286,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = kafkaApis.handleFetchFromShareFetchRequest( request, @@ -5644,7 +5432,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = kafkaApis.handleFetchFromShareFetchRequest( request, @@ -5787,7 +5575,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = kafkaApis.handleFetchFromShareFetchRequest( request, @@ -5956,7 +5744,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = kafkaApis.handleFetchFromShareFetchRequest( request, @@ -6129,7 +5917,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) var response = verifyNoThrottling[ShareFetchResponse](request) var responseData = response.data() @@ -6216,7 +6004,7 @@ class KafkaApisTest extends Logging { GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) @@ -6259,7 +6047,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) @@ -6312,7 +6100,7 @@ class KafkaApisTest extends Logging { ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), authorizer = Option(authorizer), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) @@ -6375,7 +6163,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareFetchRequest(request) val response = verifyNoThrottling[ShareFetchResponse](request) val responseData = response.data() @@ -6442,7 +6230,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) val responseData = response.data() @@ -6490,7 +6278,7 @@ class KafkaApisTest extends Logging { GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6532,7 +6320,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6584,7 +6372,7 @@ class KafkaApisTest extends Logging { ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), authorizer = Option(authorizer), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6635,7 +6423,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6686,7 +6474,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6735,7 +6523,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6810,7 +6598,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) @@ -6873,7 +6661,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) val responseData = response.data() @@ -6940,7 +6728,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) val responseData = response.data() @@ -7008,7 +6796,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) kafkaApis.handleShareAcknowledgeRequest(request) val response = verifyNoThrottling[ShareAcknowledgeResponse](request) val responseData = response.data() @@ -7094,7 +6882,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, topicNames, erroneous) assertEquals(4, acknowledgeBatches.size) @@ -7163,7 +6951,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, topicIdNames, erroneous) val erroneousTopicIdPartitions = kafkaApis.validateAcknowledgementBatches(acknowledgeBatches, erroneous) @@ -7236,7 +7024,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicNames, erroneous) assertEquals(3, acknowledgeBatches.size) @@ -7303,7 +7091,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicIdNames, erroneous) val erroneousTopicIdPartitions = kafkaApis.validateAcknowledgementBatches(acknowledgeBatches, erroneous) @@ -7375,7 +7163,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val ackResult = kafkaApis.handleAcknowledgements( acknowledgementData, erroneous, @@ -7454,7 +7242,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val ackResult = kafkaApis.handleAcknowledgements( acknowledgementData, erroneous, @@ -7534,7 +7322,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val ackResult = kafkaApis.handleAcknowledgements( acknowledgementData, erroneous, @@ -7608,7 +7396,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val ackResult = kafkaApis.handleAcknowledgements( acknowledgementData, erroneous, @@ -7705,7 +7493,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map( ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true) + ) val response = kafkaApis.processShareAcknowledgeResponse(responseAcknowledgeData, request) val responseData = response.data() val topicResponses = responseData.responses() @@ -9140,7 +8928,6 @@ class KafkaApisTest extends Logging { val describeClusterResponse = verifyNoThrottling[DescribeClusterResponse](request) - assertEquals(metadataCache.getControllerId.get.id, describeClusterResponse.data.controllerId) assertEquals(clusterId, describeClusterResponse.data.clusterId) assertEquals(8096, describeClusterResponse.data.clusterAuthorizedOperations) assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet, @@ -9779,7 +9566,7 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handleAlterConfigsRequest(request) val response = verifyNoThrottling[AlterConfigsResponse](request) assertEquals(new AlterConfigsResponseData(), response.data()) @@ -9799,7 +9586,7 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handleAlterConfigsRequest(request) val response = verifyNoThrottling[AlterConfigsResponse](request) assertEquals(new AlterConfigsResponseData().setResponses(asList( @@ -9817,7 +9604,7 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handleIncrementalAlterConfigsRequest(request) val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) assertEquals(new IncrementalAlterConfigsResponseData(), response.data()) @@ -9837,7 +9624,7 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handleIncrementalAlterConfigsRequest(request) val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) assertEquals(new IncrementalAlterConfigsResponseData().setResponses(asList( @@ -9855,7 +9642,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() @@ -9879,7 +9666,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) kafkaApis = createKafkaApis( featureVersions = Seq(GroupVersion.GV_1), - raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9906,7 +9693,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) kafkaApis = createKafkaApis( featureVersions = Seq(GroupVersion.GV_1), - raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9929,7 +9716,7 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( authorizer = Some(authorizer), featureVersions = Seq(GroupVersion.GV_1), - raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9955,7 +9742,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) kafkaApis = createKafkaApis( featureVersions = Seq(GroupVersion.GV_1), - raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -9998,7 +9785,7 @@ class KafkaApisTest extends Logging { val expectedResponse = new ConsumerGroupDescribeResponseData() expectedResponse.groups.add(expectedDescribedGroup) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) @@ -10026,7 +9813,7 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( authorizer = Some(authorizer), featureVersions = Seq(GroupVersion.GV_1), - raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10049,7 +9836,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) kafkaApis = createKafkaApis( featureVersions = Seq(GroupVersion.GV_1), - raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10069,7 +9856,7 @@ class KafkaApisTest extends Logging { new GetTelemetrySubscriptionsResponseData())) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) @@ -10088,7 +9875,7 @@ class KafkaApisTest extends Logging { any[RequestContext]())).thenThrow(new RuntimeException("test")) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) @@ -10106,7 +9893,7 @@ class KafkaApisTest extends Logging { .thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData())) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[PushTelemetryResponse](request) @@ -10123,7 +9910,7 @@ class KafkaApisTest extends Logging { .thenThrow(new RuntimeException("test")) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[PushTelemetryResponse](request) @@ -10140,7 +9927,7 @@ class KafkaApisTest extends Logging { resources.add("test1") resources.add("test2") when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources( @@ -10155,7 +9942,7 @@ class KafkaApisTest extends Logging { val resources = new mutable.HashSet[String] when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) val expectedResponse = new ListClientMetricsResourcesResponseData() @@ -10168,7 +9955,7 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientMetricsManager.listClientMetricsResources).thenThrow(new RuntimeException("test")) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) @@ -10182,7 +9969,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData() @@ -10205,7 +9992,6 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) kafkaApis = createKafkaApis( overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10230,7 +10016,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), authorizer = Some(authorizer), - raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10252,7 +10037,6 @@ class KafkaApisTest extends Logging { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) kafkaApis = createKafkaApis( overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), - raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10555,7 +10339,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = configOverrides, authorizer = Option(authorizer), - raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10584,7 +10367,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = configOverrides, authorizer = Option(authorizer), - raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching()) @@ -10614,7 +10396,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = configOverrides, authorizer = Option(authorizer), - raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())