This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 448b93bccd136b479a4aae6b9f9e5a54fcc9e2b5
Author: Chung, Ming-Yen <[email protected]>
AuthorDate: Wed Jan 22 22:50:16 2025 +0800

    KAFKA-18599: Remove Optional wrapping for forwardingManager in 
ApiVersionManager (#18630)
    
    `forwardingManager` is always present now.
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../scala/kafka/server/ApiVersionManager.scala     |  6 +--
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  | 61 ++++------------------
 3 files changed, 15 insertions(+), 54 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala 
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 972af0414e4..fd1c70e509f 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -46,7 +46,7 @@ object ApiVersionManager {
   def apply(
     listenerType: ListenerType,
     config: KafkaConfig,
-    forwardingManager: Option[ForwardingManager],
+    forwardingManager: ForwardingManager,
     supportedFeatures: BrokerFeatures,
     metadataCache: MetadataCache,
     clientMetricsManager: Option[ClientMetricsManager]
@@ -129,7 +129,7 @@ class SimpleApiVersionManager(
  */
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
-  forwardingManager: Option[ForwardingManager],
+  forwardingManager: ForwardingManager,
   brokerFeatures: BrokerFeatures,
   metadataCache: MetadataCache,
   val enableUnstableLastVersion: Boolean,
@@ -143,7 +143,7 @@ class DefaultApiVersionManager(
     alterFeatureLevel0: Boolean
   ): ApiVersionsResponse = {
     val finalizedFeatures = metadataCache.features()
-    val controllerApiVersions = 
forwardingManager.flatMap(_.controllerApiVersions)
+    val controllerApiVersions = forwardingManager.controllerApiVersions
     val clientTelemetryEnabled = clientMetricsManager match {
       case Some(manager) => manager.isTelemetryReceiverConfigured
       case None => false
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 7127565e1a0..2f923241bbe 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -255,7 +255,7 @@ class BrokerServer(
       val apiVersionManager = ApiVersionManager(
         ListenerType.BROKER,
         config,
-        Some(forwardingManager),
+        forwardingManager,
         brokerFeatures,
         metadataCache,
         Some(clientMetricsManager)
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index fcfd8a05ae6..341c859bf32 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -21,7 +21,7 @@ import 
org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.KRaftVersion
-import org.junit.jupiter.api.{Disabled, Test}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
@@ -36,9 +36,10 @@ class ApiVersionManagerTest {
   @ParameterizedTest
   @EnumSource(classOf[ListenerType])
   def testApiScope(apiScope: ListenerType): Unit = {
+    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
     val versionManager = new DefaultApiVersionManager(
       listenerType = apiScope,
-      forwardingManager = None,
+      forwardingManager = forwardingManager,
       brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
@@ -54,9 +55,10 @@ class ApiVersionManagerTest {
   @ParameterizedTest
   @EnumSource(classOf[ListenerType])
   def testDisabledApis(apiScope: ListenerType): Unit = {
+    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
     val versionManager = new DefaultApiVersionManager(
       listenerType = apiScope,
-      forwardingManager = None,
+      forwardingManager = forwardingManager,
       brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = false
@@ -85,7 +87,7 @@ class ApiVersionManagerTest {
 
     val versionManager = new DefaultApiVersionManager(
       listenerType = ListenerType.ZK_BROKER,
-      forwardingManager = Some(forwardingManager),
+      forwardingManager = forwardingManager,
       brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
@@ -103,59 +105,18 @@ class ApiVersionManagerTest {
     val forwardingManager = Mockito.mock(classOf[ForwardingManager])
     Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
 
-    for (forwardingManagerOpt <- Seq(Some(forwardingManager), None)) {
-      val versionManager = new DefaultApiVersionManager(
-        listenerType = ListenerType.BROKER,
-        forwardingManager = forwardingManagerOpt,
-        brokerFeatures = brokerFeatures,
-        metadataCache = metadataCache,
-        enableUnstableLastVersion = true
-      )
-      assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion))
-      assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
-
-      val apiVersionsResponse = 
versionManager.apiVersionResponse(throttleTimeMs = 0, false)
-      val envelopeVersion = 
apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
-      assertNull(envelopeVersion)
-    }
-  }
-
-  @Disabled("Enable after enable KIP-590 forwarding in KAFKA-12886")
-  @Test
-  def testEnvelopeEnabledWhenForwardingManagerPresent(): Unit = {
-    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
-    Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
-
     val versionManager = new DefaultApiVersionManager(
-      listenerType = ListenerType.ZK_BROKER,
-      forwardingManager = Some(forwardingManager),
+      listenerType = ListenerType.BROKER,
+      forwardingManager = forwardingManager,
       brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
     )
-    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion))
-    assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
+    assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion))
+    assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
 
     val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs 
= 0, false)
     val envelopeVersion = 
apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
-    assertNotNull(envelopeVersion)
-    assertEquals(ApiKeys.ENVELOPE.oldestVersion, envelopeVersion.minVersion)
-    assertEquals(ApiKeys.ENVELOPE.latestVersion, envelopeVersion.maxVersion)
-  }
-
-  @Test
-  def testEnvelopeDisabledWhenForwardingManagerEmpty(): Unit = {
-    val versionManager = new DefaultApiVersionManager(
-      listenerType = ListenerType.ZK_BROKER,
-      forwardingManager = None,
-      brokerFeatures = brokerFeatures,
-      metadataCache = metadataCache,
-      enableUnstableLastVersion = true
-    )
-    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion))
-    assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
-
-    val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs 
= 0, false)
-    assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
+    assertNull(envelopeVersion)
   }
 }

Reply via email to