This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new df5272a27e5 KAFKA-18399 Remove ZooKeeper from KafkaApis (12/N): clean 
up ZKMetadataCache, KafkaController and raftSupport (#18542)
df5272a27e5 is described below

commit df5272a27e5708ad783f11c58ce946cddeff57d2
Author: TaiJuWu <tjwu1...@gmail.com>
AuthorDate: Wed Jan 15 23:28:57 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (12/N): clean up 
ZKMetadataCache, KafkaController and raftSupport (#18542)
    
    Reviewers: Viktor Somogyi-Vass <viktorsomo...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 403 +++++----------------
 1 file changed, 92 insertions(+), 311 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a19e1a4296c..d778c785982 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,16 +17,14 @@
 
 package kafka.server
 
-import kafka.cluster.{Broker, Partition}
-import kafka.controller.KafkaController
+import kafka.cluster.Partition
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.UnifiedLog
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, 
MockConfigRepository, ZkMetadataCache}
+import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, 
MockConfigRepository}
 import kafka.server.share.SharePartitionManager
 import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common._
@@ -82,7 +80,7 @@ import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
-import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
 import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData}
 import org.apache.kafka.server.quota.ThrottleCallback
@@ -119,9 +117,7 @@ class KafkaApisTest extends Logging {
   private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
   private val groupCoordinator: GroupCoordinator = 
mock(classOf[GroupCoordinator])
   private val shareCoordinator: ShareCoordinator = 
mock(classOf[ShareCoordinator])
-  private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
   private val txnCoordinator: TransactionCoordinator = 
mock(classOf[TransactionCoordinator])
-  private val controller: KafkaController = mock(classOf[KafkaController])
   private val forwardingManager: ForwardingManager = 
mock(classOf[ForwardingManager])
   private val autoTopicCreationManager: AutoTopicCreationManager = 
mock(classOf[AutoTopicCreationManager])
 
@@ -129,12 +125,9 @@ class KafkaApisTest extends Logging {
     override def serialize(principal: KafkaPrincipal): Array[Byte] = 
Utils.utf8(principal.toString)
     override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
   }
-  private val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
   private val metrics = new Metrics()
   private val brokerId = 1
-  // KRaft tests should override this with a KRaftMetadataCache
-  private var metadataCache: MetadataCache = 
MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting())
-  private val brokerEpochManager: ZkBrokerEpochManager = new 
ZkBrokerEpochManager(metadataCache, controller, None)
+  private var metadataCache: MetadataCache = 
MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION)
   private val clientQuotaManager: ClientQuotaManager = 
mock(classOf[ClientQuotaManager])
   private val clientRequestQuotaManager: ClientRequestQuotaManager = 
mock(classOf[ClientRequestQuotaManager])
   private val clientControllerQuotaManager: ControllerMutationQuotaManager = 
mock(classOf[ControllerMutationQuotaManager])
@@ -162,59 +155,37 @@ class KafkaApisTest extends Logging {
 
   def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = 
MetadataVersion.latestTesting,
                       authorizer: Option[Authorizer] = None,
-                      enableForwarding: Boolean = false,
                       configRepository: ConfigRepository = new 
MockConfigRepository(),
-                      raftSupport: Boolean = false,
                       overrideProperties: Map[String, String] = Map.empty,
                       featureVersions: Seq[FeatureVersion] = Seq.empty): 
KafkaApis = {
-    val properties = if (raftSupport) {
-      val properties = TestUtils.createBrokerConfig(brokerId)
-      properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
-      properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
-      val voterId = brokerId + 1
-      properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
s"$voterId@localhost:9093")
-      properties
-    } else {
-      TestUtils.createBrokerConfig(brokerId)
-    }
+
+    val properties = TestUtils.createBrokerConfig(brokerId)
+    properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
+    properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    val voterId = brokerId + 1
+    properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
s"$voterId@localhost:9093")
+
     overrideProperties.foreach( p => properties.put(p._1, p._2))
     TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
     val config = new KafkaConfig(properties)
 
-    val forwardingManagerOpt = if (enableForwarding)
-      Some(this.forwardingManager)
-    else
-      None
-
-    val metadataSupport = if (raftSupport) {
-      // it will be up to the test to replace the default ZkMetadataCache 
implementation
-      // with a KRaftMetadataCache instance
-      metadataCache match {
+    val metadataSupport = metadataCache match {
         case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache)
         case _ => throw new IllegalStateException("Test must set an instance 
of KRaftMetadataCache")
       }
-    } else {
-      metadataCache match {
-        case zkMetadataCache: ZkMetadataCache =>
-          ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, 
zkMetadataCache, brokerEpochManager)
-        case _ => throw new IllegalStateException("Test must set an instance 
of ZkMetadataCache")
-      }
-    }
 
-    val listenerType = if (raftSupport) ListenerType.BROKER else 
ListenerType.ZK_BROKER
-    val enabledApis = if (enableForwarding) {
-      ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE)
-    } else {
-      ApiKeys.apisForListener(listenerType).asScala.toSet
-    }
+
+    val listenerType = ListenerType.BROKER
+    val enabledApis = ApiKeys.apisForListener(listenerType).asScala
+
     val apiVersionManager = new SimpleApiVersionManager(
       listenerType,
       enabledApis,
       BrokerFeatures.defaultSupportedFeatures(true),
       true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
+      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
 
-    val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) 
else None
+    val clientMetricsManagerOpt = Some(clientMetricsManager)
 
     
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
     setupFeatures(featureVersions)
@@ -290,7 +261,7 @@ class KafkaApisTest extends Logging {
     topicConfigs.put(propName, propValue)
     when(configRepository.topicConfig(resourceName)).thenReturn(topicConfigs)
 
-    metadataCache = mock(classOf[ZkMetadataCache])
+    metadataCache = mock(classOf[KRaftMetadataCache])
     when(metadataCache.contains(resourceName)).thenReturn(true)
 
     val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
@@ -299,10 +270,8 @@ class KafkaApisTest extends Logging {
         .setResourceName(resourceName)
         .setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
       .build(requestHeader.apiVersion)
-    val request = buildRequest(describeConfigsRequest,
-      requestHeader = Option(requestHeader))
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
+    val request = buildRequest(describeConfigsRequest, requestHeader = 
Option(requestHeader))
+
     kafkaApis = createKafkaApis(authorizer = Some(authorizer), 
configRepository = configRepository)
     kafkaApis.handleDescribeConfigsRequest(request)
 
@@ -344,7 +313,7 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = 
Option(requestHeader))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
-    createKafkaApis(authorizer = Some(authorizer), raftSupport = 
true).handleIncrementalAlterConfigsRequest(request)
+    createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
     verify(forwardingManager, times(1)).forwardRequest(
       any(),
       any(),
@@ -422,7 +391,7 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(apiRequest)
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handleAlterConfigsRequest(request)
     verify(forwardingManager, times(1)).forwardRequest(
       any(),
@@ -444,7 +413,7 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = 
Option(requestHeader))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handleIncrementalAlterConfigsRequest(request)
     verify(forwardingManager, times(1)).forwardRequest(
       any(),
@@ -484,7 +453,7 @@ class KafkaApisTest extends Logging {
     val cmConfigs = ClientMetricsTestUtils.defaultProperties
     when(configRepository.config(resource)).thenReturn(cmConfigs)
 
-    metadataCache = mock(classOf[ZkMetadataCache])
+    metadataCache = mock(classOf[KRaftMetadataCache])
     when(metadataCache.contains(subscriptionName)).thenReturn(true)
 
     val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
@@ -495,8 +464,7 @@ class KafkaApisTest extends Logging {
       .build(requestHeader.apiVersion)
     val request = buildRequest(describeConfigsRequest,
       requestHeader = Option(requestHeader))
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
+
     kafkaApis = createKafkaApis(authorizer = Some(authorizer), 
configRepository = configRepository)
     kafkaApis.handleDescribeConfigsRequest(request)
 
@@ -518,7 +486,7 @@ class KafkaApisTest extends Logging {
     val requestData = 
DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
     val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     testForwardableApi(kafkaApis = kafkaApis,
       ApiKeys.DESCRIBE_QUORUM,
       requestBuilder
@@ -530,7 +498,7 @@ class KafkaApisTest extends Logging {
     requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
   ): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
+    kafkaApis = createKafkaApis()
     testForwardableApi(kafkaApis = kafkaApis,
       apiKey,
       requestBuilder
@@ -548,13 +516,6 @@ class KafkaApisTest extends Logging {
     val apiRequest = requestBuilder.build(topicHeader.apiVersion)
     val request = buildRequest(apiRequest)
 
-    if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
-      // The controller check only makes sense for ZK clusters. For KRaft,
-      // controller requests are handled on a separate listener, so there
-      // is no choice but to forward them.
-      when(controller.isActive).thenReturn(false)
-    }
-
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
     val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
@@ -573,9 +534,6 @@ class KafkaApisTest extends Logging {
     val capturedResponse = verifyNoThrottling[AbstractResponse](request)
     assertEquals(expectedResponse.data, capturedResponse.data)
 
-    if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
-      verify(controller).isActive
-    }
   }
 
   private def authorizeResource(authorizer: Authorizer,
@@ -612,7 +570,7 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = 
Option(requestHeader))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer), raftSupport = 
true)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
     kafkaApis.handleIncrementalAlterConfigsRequest(request)
 
     verify(authorizer, times(1)).authorize(any(), any())
@@ -652,7 +610,7 @@ class KafkaApisTest extends Logging {
     val requestBuilder = new CreateTopicsRequest.Builder(requestData).build()
     val request = buildRequest(requestBuilder)
 
-    kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
+    kafkaApis = createKafkaApis()
     val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] =
       ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
 
@@ -904,8 +862,7 @@ class KafkaApisTest extends Logging {
       any[Long])).thenReturn(0)
 
     val capturedRequest = verifyTopicCreation(topicName, 
enableAutoTopicCreation, isInternal, request)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer), 
enableForwarding = enableAutoTopicCreation,
-      overrideProperties = topicConfigOverride)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer), 
overrideProperties = topicConfigOverride)
     kafkaApis.handleTopicMetadataRequest(request)
 
     val response = verifyNoThrottling[MetadataResponse](request)
@@ -2242,7 +2199,7 @@ class KafkaApisTest extends Logging {
   @Test
   def testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower(): Unit = {
     val topic = "topic"
-    metadataCache = mock(classOf[ZkMetadataCache])
+    metadataCache = mock(classOf[KRaftMetadataCache])
 
     for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
 
@@ -3612,175 +3569,6 @@ class KafkaApisTest extends Logging {
     assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
   }
 
-
-  /**
-   * Metadata request to fetch all topics should not result in the followings:
-   * 1) Auto topic creation
-   * 2) UNKNOWN_TOPIC_OR_PARTITION
-   *
-   * This case is testing the case that a topic is being deleted from 
MetadataCache right after
-   * authorization but before checking in MetadataCache.
-   */
-  @Test
-  def 
testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): 
Unit = {
-    // Setup: authorizer authorizes 2 topics, but one got deleted in metadata 
cache
-    metadataCache = mock(classOf[ZkMetadataCache])
-    when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new 
Node(brokerId,"localhost", 0)))
-    when(metadataCache.getControllerId).thenReturn(None)
-
-    // 2 topics returned for authorization in during handle
-    val topicsReturnedFromMetadataCacheForAuthorization = 
Set("remaining-topic", "later-deleted-topic")
-    
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
-    // 1 topic is deleted from metadata right at the time between 
authorization and the next getTopicMetadata() call
-    when(metadataCache.getTopicMetadata(
-      ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
-      any[ListenerName],
-      anyBoolean,
-      anyBoolean
-    )).thenReturn(Seq(
-      new MetadataResponseTopic()
-        .setErrorCode(Errors.NONE.code)
-        .setName("remaining-topic")
-        .setIsInternal(false)
-    ))
-
-
-    var createTopicIsCalled: Boolean = false
-    // Specific mock on zkClient for this use case
-    // Expect it's never called to do auto topic creation
-    when(zkClient.setOrCreateEntityConfigs(
-      ArgumentMatchers.eq(ConfigType.TOPIC),
-      anyString,
-      any[Properties]
-    )).thenAnswer(_ => {
-      createTopicIsCalled = true
-    })
-    // No need to use
-    when(zkClient.getAllBrokersInCluster)
-      .thenReturn(Seq(new Broker(
-        brokerId, "localhost", 9902,
-        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), 
SecurityProtocol.PLAINTEXT
-      )))
-
-
-    val (requestListener, _) = updateMetadataCacheWithInconsistentListeners()
-    val response = 
sendMetadataRequestWithInconsistentListeners(requestListener)
-
-    assertFalse(createTopicIsCalled)
-    val responseTopics = response.topicMetadata().asScala.map { metadata => 
metadata.topic() }
-    assertEquals(List("remaining-topic"), responseTopics)
-    
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
-  }
-
-  @Test
-  def testUnauthorizedTopicMetadataRequest(): Unit = {
-    // 1. Set up broker information
-    val plaintextListener = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-    val broker = new UpdateMetadataBroker()
-      .setId(0)
-      .setRack("rack")
-      .setEndpoints(Seq(
-        new UpdateMetadataEndpoint()
-          .setHost("broker0")
-          .setPort(9092)
-          .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-          .setListener(plaintextListener.value)
-      ).asJava)
-
-    // 2. Set up authorizer
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-    val unauthorizedTopic = "unauthorized-topic"
-    val authorizedTopic = "authorized-topic"
-
-    val expectedActions = Seq(
-      new Action(AclOperation.DESCRIBE, new 
ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, 
true, true),
-      new Action(AclOperation.DESCRIBE, new 
ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, 
true, true)
-    )
-
-    when(authorizer.authorize(any[RequestContext], argThat((t: 
java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
-      .thenAnswer { invocation =>
-      val actions = 
invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
-      actions.map { action =>
-        if (action.resourcePattern().name().equals(authorizedTopic))
-          AuthorizationResult.ALLOWED
-        else
-          AuthorizationResult.DENIED
-      }.asJava
-    }
-
-    // 3. Set up MetadataCache
-    val authorizedTopicId = Uuid.randomUuid()
-    val unauthorizedTopicId = Uuid.randomUuid()
-
-    val topicIds = new util.HashMap[String, Uuid]()
-    topicIds.put(authorizedTopic, authorizedTopicId)
-    topicIds.put(unauthorizedTopic, unauthorizedTopicId)
-
-    def createDummyPartitionStates(topic: String) = {
-      new UpdateMetadataPartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(0)
-        .setReplicas(Collections.singletonList(0))
-        .setZkVersion(0)
-        .setIsr(Collections.singletonList(0))
-    }
-
-    // Send UpdateMetadataReq to update MetadataCache
-    val partitionStates = Seq(unauthorizedTopic, 
authorizedTopic).map(createDummyPartitionStates)
-
-    val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
-      0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
-    metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId = 
0, updateMetadataRequest)
-
-    // 4. Send TopicMetadataReq using topicId
-    val metadataReqByTopicId = new 
MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, 
unauthorizedTopicId)).build()
-    val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleTopicMetadataRequest(repByTopicId)
-    val metadataByTopicIdResp = 
verifyNoThrottling[MetadataResponse](repByTopicId)
-
-    val metadataByTopicId = 
metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => 
(kv._1, kv._2.head))
-
-    metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
-      if (topicId == unauthorizedTopicId) {
-        // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error 
regardless of leaking the existence of topic id
-        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
metadataResponseTopic.errorCode())
-        // Do not return topic information on unauthorized error
-        assertNull(metadataResponseTopic.name())
-      } else {
-        assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
-        assertEquals(authorizedTopic, metadataResponseTopic.name())
-      }
-    }
-    kafkaApis.close()
-
-    // 4. Send TopicMetadataReq using topic name
-    reset(clientRequestQuotaManager, requestChannel)
-    val metadataReqByTopicName = new 
MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), 
false).build()
-    val repByTopicName = buildRequest(metadataReqByTopicName, 
plaintextListener)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleTopicMetadataRequest(repByTopicName)
-    val metadataByTopicNameResp = 
verifyNoThrottling[MetadataResponse](repByTopicName)
-
-    val metadataByTopicName = 
metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => 
(kv._1, kv._2.head))
-
-    metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
-      if (topicName == unauthorizedTopic) {
-        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
metadataResponseTopic.errorCode())
-        // Do not return topic Id on unauthorized error
-        assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
-      } else {
-        assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
-        assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
-      }
-    }
-  }
-
     /**
    * Verifies that sending a fetch request with version 9 works correctly when
    * ReplicaManager.getLogConfig returns None.
@@ -4023,7 +3811,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     val response = verifyNoThrottling[ShareFetchResponse](request)
     val responseData = response.data()
@@ -4106,7 +3894,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -4209,7 +3997,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -4292,7 +4080,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     val response = verifyNoThrottling[ShareFetchResponse](request)
     val responseData = response.data()
@@ -4369,7 +4157,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     val response = verifyNoThrottling[ShareFetchResponse](request)
     val responseData = response.data()
@@ -4433,7 +4221,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     val response = verifyNoThrottling[ShareFetchResponse](request)
     val responseData = response.data()
@@ -4489,7 +4277,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     val response = verifyNoThrottling[ShareFetchResponse](request)
     val responseData = response.data()
@@ -4560,7 +4348,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -4653,7 +4441,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -4800,7 +4588,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -5133,7 +4921,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -5498,7 +5286,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val fetchResult: Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData] =
       kafkaApis.handleFetchFromShareFetchRequest(
         request,
@@ -5644,7 +5432,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val fetchResult: Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData] =
       kafkaApis.handleFetchFromShareFetchRequest(
         request,
@@ -5787,7 +5575,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val fetchResult: Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData] =
       kafkaApis.handleFetchFromShareFetchRequest(
         request,
@@ -5956,7 +5744,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val fetchResult: Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData] =
       kafkaApis.handleFetchFromShareFetchRequest(
         request,
@@ -6129,7 +5917,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     var response = verifyNoThrottling[ShareFetchResponse](request)
     var responseData = response.data()
@@ -6216,7 +6004,7 @@ class KafkaApisTest extends Logging {
         GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
 
     val response = verifyNoThrottling[ShareFetchResponse](request)
@@ -6259,7 +6047,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
 
     val response = verifyNoThrottling[ShareFetchResponse](request)
@@ -6312,7 +6100,7 @@ class KafkaApisTest extends Logging {
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       authorizer = Option(authorizer),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
 
     val response = verifyNoThrottling[ShareFetchResponse](request)
@@ -6375,7 +6163,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareFetchRequest(request)
     val response = verifyNoThrottling[ShareFetchResponse](request)
     val responseData = response.data()
@@ -6442,7 +6230,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
     val responseData = response.data()
@@ -6490,7 +6278,7 @@ class KafkaApisTest extends Logging {
         GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6532,7 +6320,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6584,7 +6372,7 @@ class KafkaApisTest extends Logging {
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       authorizer = Option(authorizer),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6635,7 +6423,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6686,7 +6474,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6735,7 +6523,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6810,7 +6598,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
 
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
@@ -6873,7 +6661,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
     val responseData = response.data()
@@ -6940,7 +6728,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
     val responseData = response.data()
@@ -7008,7 +6796,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     kafkaApis.handleShareAcknowledgeRequest(request)
     val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
     val responseData = response.data()
@@ -7094,7 +6882,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val acknowledgeBatches = 
kafkaApis.getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, 
topicNames, erroneous)
 
     assertEquals(4, acknowledgeBatches.size)
@@ -7163,7 +6951,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val acknowledgeBatches = 
kafkaApis.getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, 
topicIdNames, erroneous)
     val erroneousTopicIdPartitions = 
kafkaApis.validateAcknowledgementBatches(acknowledgeBatches, erroneous)
 
@@ -7236,7 +7024,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val acknowledgeBatches = 
kafkaApis.getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest,
 topicNames, erroneous)
 
     assertEquals(3, acknowledgeBatches.size)
@@ -7303,7 +7091,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val acknowledgeBatches = 
kafkaApis.getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest,
 topicIdNames, erroneous)
     val erroneousTopicIdPartitions = 
kafkaApis.validateAcknowledgementBatches(acknowledgeBatches, erroneous)
 
@@ -7375,7 +7163,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val ackResult = kafkaApis.handleAcknowledgements(
       acknowledgementData,
       erroneous,
@@ -7454,7 +7242,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val ackResult = kafkaApis.handleAcknowledgements(
       acknowledgementData,
       erroneous,
@@ -7534,7 +7322,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val ackResult = kafkaApis.handleAcknowledgements(
       acknowledgementData,
       erroneous,
@@ -7608,7 +7396,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val ackResult = kafkaApis.handleAcknowledgements(
       acknowledgementData,
       erroneous,
@@ -7705,7 +7493,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = Map(
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      raftSupport = true)
+      )
     val response = 
kafkaApis.processShareAcknowledgeResponse(responseAcknowledgeData, request)
     val responseData = response.data()
     val topicResponses = responseData.responses()
@@ -9140,7 +8928,6 @@ class KafkaApisTest extends Logging {
 
     val describeClusterResponse = 
verifyNoThrottling[DescribeClusterResponse](request)
 
-    assertEquals(metadataCache.getControllerId.get.id, 
describeClusterResponse.data.controllerId)
     assertEquals(clusterId, describeClusterResponse.data.clusterId)
     assertEquals(8096, 
describeClusterResponse.data.clusterAuthorizedOperations)
     assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet,
@@ -9779,7 +9566,7 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handleAlterConfigsRequest(request)
     val response = verifyNoThrottling[AlterConfigsResponse](request)
     assertEquals(new AlterConfigsResponseData(), response.data())
@@ -9799,7 +9586,7 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handleAlterConfigsRequest(request)
     val response = verifyNoThrottling[AlterConfigsResponse](request)
     assertEquals(new AlterConfigsResponseData().setResponses(asList(
@@ -9817,7 +9604,7 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handleIncrementalAlterConfigsRequest(request)
     val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
     assertEquals(new IncrementalAlterConfigsResponseData(), response.data())
@@ -9837,7 +9624,7 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handleIncrementalAlterConfigsRequest(request)
     val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
     assertEquals(new IncrementalAlterConfigsResponseData().setResponses(asList(
@@ -9855,7 +9642,7 @@ class KafkaApisTest extends Logging {
 
     val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData()
@@ -9879,7 +9666,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
       featureVersions = Seq(GroupVersion.GV_1),
-      raftSupport = true
+      
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9906,7 +9693,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
       featureVersions = Seq(GroupVersion.GV_1),
-      raftSupport = true
+      
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9929,7 +9716,7 @@ class KafkaApisTest extends Logging {
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
       featureVersions = Seq(GroupVersion.GV_1),
-      raftSupport = true
+      
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9955,7 +9742,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
       featureVersions = Seq(GroupVersion.GV_1),
-      raftSupport = true
+      
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -9998,7 +9785,7 @@ class KafkaApisTest extends Logging {
     val expectedResponse = new ConsumerGroupDescribeResponseData()
     expectedResponse.groups.add(expectedDescribedGroup)
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
     val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
 
@@ -10026,7 +9813,7 @@ class KafkaApisTest extends Logging {
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
       featureVersions = Seq(GroupVersion.GV_1),
-      raftSupport = true
+      
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10049,7 +9836,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
       featureVersions = Seq(GroupVersion.GV_1),
-      raftSupport = true
+      
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10069,7 +9856,7 @@ class KafkaApisTest extends Logging {
       new GetTelemetrySubscriptionsResponseData()))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
@@ -10088,7 +9875,7 @@ class KafkaApisTest extends Logging {
       any[RequestContext]())).thenThrow(new RuntimeException("test"))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
@@ -10106,7 +9893,7 @@ class KafkaApisTest extends Logging {
       .thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData()))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
     val response = verifyNoThrottling[PushTelemetryResponse](request)
 
@@ -10123,7 +9910,7 @@ class KafkaApisTest extends Logging {
       .thenThrow(new RuntimeException("test"))
 
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
     val response = verifyNoThrottling[PushTelemetryResponse](request)
 
@@ -10140,7 +9927,7 @@ class KafkaApisTest extends Logging {
     resources.add("test1")
     resources.add("test2")
     
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
     val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
     val expectedResponse = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
@@ -10155,7 +9942,7 @@ class KafkaApisTest extends Logging {
 
     val resources = new mutable.HashSet[String]
     
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
     val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
     val expectedResponse = new ListClientMetricsResourcesResponseData()
@@ -10168,7 +9955,7 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
 
     when(clientMetricsManager.listClientMetricsResources).thenThrow(new 
RuntimeException("test"))
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
     val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
 
@@ -10182,7 +9969,7 @@ class KafkaApisTest extends Logging {
 
     val requestChannelRequest = buildRequest(new 
ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData()
@@ -10205,7 +9992,6 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
-      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10230,7 +10016,6 @@ class KafkaApisTest extends Logging {
     kafkaApis = createKafkaApis(
       overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
       authorizer = Some(authorizer),
-      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10252,7 +10037,6 @@ class KafkaApisTest extends Logging {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
-      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10555,7 +10339,6 @@ class KafkaApisTest extends Logging {
     kafkaApis = createKafkaApis(
       overrideProperties = configOverrides,
       authorizer = Option(authorizer),
-      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10584,7 +10367,6 @@ class KafkaApisTest extends Logging {
     kafkaApis = createKafkaApis(
       overrideProperties = configOverrides,
       authorizer = Option(authorizer),
-      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
 
@@ -10614,7 +10396,6 @@ class KafkaApisTest extends Logging {
     kafkaApis = createKafkaApis(
       overrideProperties = configOverrides,
       authorizer = Option(authorizer),
-      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
 

Reply via email to