This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f2a68b1501 KAFKA-19119 Move ApiVersionManager/SimpleApiVersionManager
to server (#19426)
5f2a68b1501 is described below
commit 5f2a68b150175def0f27c873469d509efd74addd
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Apr 15 08:32:44 2025 +0200
KAFKA-19119 Move ApiVersionManager/SimpleApiVersionManager to server
(#19426)
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-server.xml | 1 +
.../kafka/server/builders/KafkaApisBuilder.java | 2 +-
.../main/scala/kafka/network/SocketServer.scala | 6 +-
.../scala/kafka/server/ApiVersionManager.scala | 176 ---------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 10 +-
.../main/scala/kafka/server/ControllerApis.scala | 2 +-
.../main/scala/kafka/server/ControllerServer.scala | 2 +-
.../scala/kafka/server/ForwardingManager.scala | 7 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/kafka/tools/TestRaftRequestHandler.scala | 5 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 3 +-
.../scala/unit/kafka/network/ProcessorTest.scala | 29 ++--
.../unit/kafka/network/SocketServerTest.scala | 1 +
.../unit/kafka/server/ControllerApisTest.scala | 1 +
...st.scala => DefaultApiVersionManagerTest.scala} | 71 +++++----
.../scala/unit/kafka/server/KafkaApisTest.scala | 9 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
.../org/apache/kafka/server/ApiVersionManager.java | 73 +++++++++
.../kafka/server/DefaultApiVersionManager.java | 107 +++++++++++++
.../kafka/server/SimpleApiVersionManager.java | 85 ++++++++++
20 files changed, 345 insertions(+), 249 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index a35719e8761..509d9fa27c4 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -85,6 +85,7 @@
<allow pkg="javax.crypto" />
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" />
+ <allow pkg="org.apache.kafka.network.metrics" />
<allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
<subpackage name="metrics">
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 8724fc3cb8e..9fe44c487a3 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -19,7 +19,6 @@ package kafka.server.builders;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
-import kafka.server.ApiVersionManager;
import kafka.server.AutoTopicCreationManager;
import kafka.server.FetchManager;
import kafka.server.ForwardingManager;
@@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.ApiVersionManager;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.DelegationTokenManager;
import org.apache.kafka.server.authorizer.Authorizer;
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 79cd0bc8ce2..4163b563f01 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic._
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse,
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
-import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
+import kafka.server.{BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
@@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{KafkaThread,
LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName,
Reconfigurable}
import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.ServerSocketFactory
+import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.ConnectionDisconnectListener
@@ -872,7 +872,7 @@ private[kafka] class Processor(
credentialProvider.tokenCache,
time,
logContext,
- version => apiVersionManager.apiVersionResponse(throttleTimeMs = 0,
version < 4)
+ version => apiVersionManager.apiVersionResponse(0, version < 4)
)
)
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
deleted file mode 100644
index 9cedbaf1c9d..00000000000
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.common.feature.SupportedVersionRange
-import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.network.metrics.RequestChannelMetrics
-import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
-import org.apache.kafka.server.common.FinalizedFeatures
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-trait ApiVersionManager {
- def enableUnstableLastVersion: Boolean
- def listenerType: ListenerType
- def enabledApis: collection.Set[ApiKeys]
-
- def apiVersionResponse(throttleTimeMs: Int, alterFeatureLevel0: Boolean):
ApiVersionsResponse
-
- def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
- apiKey != null && apiKey.inScope(listenerType) &&
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
- }
- def newRequestMetrics: RequestChannelMetrics = new
RequestChannelMetrics(enabledApis.asJava)
-
- def features: FinalizedFeatures
-}
-
-object ApiVersionManager {
- def apply(
- listenerType: ListenerType,
- config: KafkaConfig,
- forwardingManager: ForwardingManager,
- supportedFeatures: BrokerFeatures,
- metadataCache: MetadataCache,
- clientMetricsManager: Option[ClientMetricsManager]
- ): ApiVersionManager = {
- new DefaultApiVersionManager(
- listenerType,
- forwardingManager,
- supportedFeatures,
- metadataCache,
- config.unstableApiVersionsEnabled,
- clientMetricsManager
- )
- }
-}
-
-/**
- * A simple ApiVersionManager that does not support forwarding and does not
have metadata cache, used in kraft controller.
- * its enabled apis are determined by the listener type, its finalized
features are dynamically determined by the controller.
- *
- * @param listenerType the listener type
- * @param enabledApis the enabled apis, which are computed by the listener type
- * @param brokerFeatures the broker features
- * @param enableUnstableLastVersion whether to enable unstable last version,
see [[KafkaConfig.unstableApiVersionsEnabled]]
- * @param featuresProvider a provider to the finalized features supported
- */
-class SimpleApiVersionManager(
- val listenerType: ListenerType,
- val enabledApis: collection.Set[ApiKeys],
- brokerFeatures:
org.apache.kafka.common.feature.Features[SupportedVersionRange],
- val enableUnstableLastVersion: Boolean,
- val featuresProvider: () => FinalizedFeatures
-) extends ApiVersionManager {
-
- def this(
- listenerType: ListenerType,
- enableUnstableLastVersion: Boolean,
- featuresProvider: () => FinalizedFeatures
- ) = {
- this(
- listenerType,
- ApiKeys.apisForListener(listenerType).asScala,
- BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion),
- enableUnstableLastVersion,
- featuresProvider
- )
- }
-
- private val apiVersions = ApiVersionsResponse.collectApis(listenerType,
enabledApis.asJava, enableUnstableLastVersion)
-
- override def apiVersionResponse(
- throttleTimeMs: Int,
- alterFeatureLevel0: Boolean
- ): ApiVersionsResponse = {
- val currentFeatures = features
- new ApiVersionsResponse.Builder().
- setThrottleTimeMs(throttleTimeMs).
- setApiVersions(apiVersions).
- setSupportedFeatures(brokerFeatures).
- setFinalizedFeatures(currentFeatures.finalizedFeatures()).
- setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch()).
- setZkMigrationEnabled(false).
- setAlterFeatureLevel0(alterFeatureLevel0).
- build()
- }
-
- override def features: FinalizedFeatures = featuresProvider.apply()
-}
-
-/**
- * The default ApiVersionManager that supports forwarding and has metadata
cache, used in broker and zk controller.
- * When forwarding is enabled, the enabled apis are determined by the broker
listener type and the controller apis,
- * otherwise the enabled apis are determined by the broker listener type,
which is the same with SimpleApiVersionManager.
- *
- * @param listenerType the listener type
- * @param forwardingManager the forwarding manager,
- * @param brokerFeatures the broker features
- * @param metadataCache the metadata cache, used to get the finalized features
and the metadata version
- * @param enableUnstableLastVersion whether to enable unstable last version,
see [[KafkaConfig.unstableApiVersionsEnabled]]
- * @param clientMetricsManager the client metrics manager, helps to determine
whether client telemetry is enabled
- */
-class DefaultApiVersionManager(
- val listenerType: ListenerType,
- forwardingManager: ForwardingManager,
- brokerFeatures: BrokerFeatures,
- metadataCache: MetadataCache,
- val enableUnstableLastVersion: Boolean,
- val clientMetricsManager: Option[ClientMetricsManager] = None
-) extends ApiVersionManager {
-
- val enabledApis: mutable.Set[ApiKeys] =
ApiKeys.apisForListener(listenerType).asScala
-
- override def apiVersionResponse(
- throttleTimeMs: Int,
- alterFeatureLevel0: Boolean
- ): ApiVersionsResponse = {
- val finalizedFeatures = metadataCache.features()
- val controllerApiVersions = forwardingManager.controllerApiVersions
- val clientTelemetryEnabled = clientMetricsManager match {
- case Some(manager) => manager.isTelemetryReceiverConfigured
- case None => false
- }
- val apiVersions = if (controllerApiVersions.isDefined) {
- ApiVersionsResponse.controllerApiVersions(
- controllerApiVersions.get,
- listenerType,
- enableUnstableLastVersion,
- clientTelemetryEnabled)
- } else {
- ApiVersionsResponse.brokerApiVersions(
- listenerType,
- enableUnstableLastVersion,
- clientTelemetryEnabled)
- }
- new ApiVersionsResponse.Builder().
- setThrottleTimeMs(throttleTimeMs).
- setApiVersions(apiVersions).
- setSupportedFeatures(brokerFeatures.supportedFeatures).
- setFinalizedFeatures(finalizedFeatures.finalizedFeatures()).
- setFinalizedFeaturesEpoch(finalizedFeatures.finalizedFeaturesEpoch()).
- setZkMigrationEnabled(false).
- setAlterFeatureLevel0(alterFeatureLevel0).
- build()
- }
-
- override def features: FinalizedFeatures = metadataCache.features()
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 398c085d0d5..63666ef3290 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue,
DelegationTokenManager, ProcessRole}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -252,13 +252,13 @@ class BrokerServer(
forwardingManager = new
ForwardingManagerImpl(clientToControllerChannelManager, metrics)
clientMetricsManager = new
ClientMetricsManager(clientMetricsReceiverPlugin,
config.clientTelemetryMaxBytes, time, metrics)
- val apiVersionManager = ApiVersionManager(
+ val apiVersionManager = new DefaultApiVersionManager(
ListenerType.BROKER,
- config,
- forwardingManager,
+ () => forwardingManager.controllerApiVersions,
brokerFeatures,
metadataCache,
- Some(clientMetricsManager)
+ config.unstableApiVersionsEnabled,
+ Optional.of(clientMetricsManager)
)
val connectionDisconnectListeners =
Seq(clientMetricsManager.connectionDisconnectListener())
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index fbcb0e8572d..21246102c45 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -56,7 +56,7 @@ import
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager,
ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index c2826ec8bfc..83dcb99a5cc 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{DelegationTokenManager, ProcessRole,
SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion,
NodeToControllerChannelManager}
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index 45c95e38db8..c067000bf0c 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
+import java.util.Optional
import java.util.concurrent.TimeUnit
import scala.jdk.OptionConverters.RichOptional
@@ -85,7 +86,7 @@ trait ForwardingManager {
responseCallback: Option[AbstractResponse] => Unit
): Unit
- def controllerApiVersions: Option[NodeApiVersions]
+ def controllerApiVersions: Optional[NodeApiVersions]
}
object ForwardingManager {
@@ -187,8 +188,8 @@ class ForwardingManagerImpl(
override def close(): Unit =
forwardingManagerMetrics.close()
- override def controllerApiVersions: Option[NodeApiVersions] =
- channelManager.controllerApiVersions.toScala
+ override def controllerApiVersions: Optional[NodeApiVersions] =
+ channelManager.controllerApiVersions
private def parseResponse(
buffer: ByteBuffer,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1d482344a43..81cb249f086 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupConfig,
GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager,
ProcessRole}
+import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager,
DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
TransactionVersion}
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index 733e8228b7c..2e9d8e2bb8a 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -19,13 +19,14 @@ package kafka.tools
import kafka.network.RequestChannel
import kafka.raft.RaftManager
-import kafka.server.{ApiRequestHandler, ApiVersionManager}
+import kafka.server.ApiRequestHandler
import kafka.utils.Logging
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData,
EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData,
VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse,
FetchSnapshotResponse, VoteResponse}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.ApiVersionManager
import org.apache.kafka.server.common.RequestLocal
/**
@@ -65,7 +66,7 @@ class TestRaftRequestHandler(
}
private def handleApiVersions(request: RequestChannel.Request): Unit = {
- requestChannel.sendResponse(request,
apiVersionManager.apiVersionResponse(throttleTimeMs = 0,
request.header.apiVersion() < 4), None)
+ requestChannel.sendResponse(request,
apiVersionManager.apiVersionResponse(0, request.header.apiVersion() < 4), None)
}
private def handleVote(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 69d296fe467..c07538aadad 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture,
CountDownLatch, LinkedBlockingDe
import joptsimple.{OptionException, OptionSpec}
import kafka.network.SocketServer
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
-import kafka.server.{KafkaConfig, KafkaRequestHandlerPool,
SimpleApiVersionManager}
+import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,6 +38,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid,
protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch,
QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.KRaftConfigs
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 66f3c5d5c77..575f004fe0f 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -18,20 +18,21 @@
package kafka.network
import kafka.server.metadata.KRaftMetadataCache
-import kafka.server.{DefaultApiVersionManager, ForwardingManager,
SimpleApiVersionManager}
+import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.errors.{InvalidRequestException,
UnsupportedVersionException}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.RequestHeaderData
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
-import org.apache.kafka.server.BrokerFeatures
+import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager,
SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import org.mockito.Mockito.mock
-import java.util.Collections
+import java.util.function.Supplier
+import java.util.{Collections, Optional}
class ProcessorTest {
@@ -44,7 +45,7 @@ class ProcessorTest {
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw
InvalidRequestException exception")
- assertTrue(e.toString.contains("disabled api"));
+ assertTrue(e.toString.contains("disabled api"))
}
@Test
@@ -55,26 +56,26 @@ class ProcessorTest {
.setRequestApiKey(ApiKeys.LEADER_AND_ISR.id)
.setRequestApiVersion(headerVersion)
.setClientId("clientid")
- .setCorrelationId(0);
+ .setCorrelationId(0)
val requestHeader = RequestTestUtils.serializeRequestHeader(new
RequestHeader(requestHeaderData, headerVersion))
- val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[ForwardingManager]),
- BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true)
+ val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[Supplier[Optional[NodeApiVersions]]]),
+ BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true, Optional.empty)
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
"LEADER_AND_ISR should throw InvalidRequestException exception")
- assertTrue(e.toString.contains("Unsupported api"));
+ assertTrue(e.toString.contains("Unsupported api"))
}
@Test
def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0))
- val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[ForwardingManager]),
- BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true)
+ val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[Supplier[Optional[NodeApiVersions]]]),
+ BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true, Optional.empty)
val e = assertThrows(classOf[UnsupportedVersionException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
"FETCH v0 should throw UnsupportedVersionException exception")
- assertTrue(e.toString.contains("unsupported version"));
+ assertTrue(e.toString.contains("unsupported version"))
}
/**
@@ -86,12 +87,12 @@ class ProcessorTest {
for (version <- 0 to 2) {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0))
- val apiVersionManager = new
DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]),
- BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true)
+ val apiVersionManager = new
DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[Supplier[Optional[NodeApiVersions]]]),
+ BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true, Optional.empty)
val e = assertThrows(classOf[UnsupportedVersionException],
(() => Processor.parseRequestHeader(apiVersionManager,
requestHeader)): Executable,
s"PRODUCE $version should throw UnsupportedVersionException exception")
- assertTrue(e.toString.contains("unsupported version"));
+ assertTrue(e.toString.contains("unsupported version"))
}
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f7fe1bba446..1d793e726b8 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.EndPoint
import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.{ApiVersionManager, SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 7bfbd4eee68..19e221c42a5 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -55,6 +55,7 @@ import
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion,
FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock,
RequestLocal}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
similarity index 70%
rename from core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
rename to
core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index 2d89c471249..fa1209c917e 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -22,7 +22,7 @@ import
org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
-import org.apache.kafka.server.BrokerFeatures
+import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager}
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
@@ -30,13 +30,15 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.Mockito
+import java.util.Optional
+import java.util.function.Supplier
import scala.jdk.CollectionConverters._
-class ApiVersionManagerTest {
+class DefaultApiVersionManagerTest {
private val brokerFeatures = BrokerFeatures.createDefault(true)
private val metadataCache = {
val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
- val delta = new MetadataDelta(MetadataImage.EMPTY);
+ val delta = new MetadataDelta(MetadataImage.EMPTY)
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
@@ -48,15 +50,15 @@ class ApiVersionManagerTest {
@ParameterizedTest
@EnumSource(classOf[ListenerType])
def testApiScope(apiScope: ListenerType): Unit = {
- val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+ val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
val versionManager = new DefaultApiVersionManager(
- listenerType = apiScope,
- forwardingManager = forwardingManager,
- brokerFeatures = brokerFeatures,
- metadataCache = metadataCache,
- enableUnstableLastVersion = true
+ apiScope,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ true,
+ Optional.empty
)
- assertEquals(ApiKeys.apisForListener(apiScope).asScala,
versionManager.enabledApis)
assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall { apiKey =>
apiKey.allVersions.asScala.forall { version =>
versionManager.isApiEnabled(apiKey, version)
@@ -67,13 +69,14 @@ class ApiVersionManagerTest {
@ParameterizedTest
@EnumSource(classOf[ListenerType])
def testDisabledApis(apiScope: ListenerType): Unit = {
- val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+ val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
val versionManager = new DefaultApiVersionManager(
- listenerType = apiScope,
- forwardingManager = forwardingManager,
- brokerFeatures = brokerFeatures,
- metadataCache = metadataCache,
- enableUnstableLastVersion = false
+ apiScope,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ false,
+ Optional.empty
)
ApiKeys.apisForListener(apiScope).forEach { apiKey =>
@@ -89,23 +92,24 @@ class ApiVersionManagerTest {
val controllerMinVersion: Short = 3
val controllerMaxVersion: Short = 5
- val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+ val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
-
Mockito.when(forwardingManager.controllerApiVersions).thenReturn(Some(NodeApiVersions.create(
+
Mockito.when(nodeApiVersionsSupplier.get).thenReturn(Optional.of(NodeApiVersions.create(
ApiKeys.CREATE_TOPICS.id,
controllerMinVersion,
controllerMaxVersion
)))
val versionManager = new DefaultApiVersionManager(
- listenerType = ListenerType.BROKER,
- forwardingManager = forwardingManager,
- brokerFeatures = brokerFeatures,
- metadataCache = metadataCache,
- enableUnstableLastVersion = true
+ ListenerType.BROKER,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ true,
+ Optional.empty
)
- val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs
= 0, false)
+ val apiVersionsResponse = versionManager.apiVersionResponse(0, false)
val alterConfigVersion =
apiVersionsResponse.data.apiKeys.find(ApiKeys.CREATE_TOPICS.id)
assertNotNull(alterConfigVersion)
assertEquals(controllerMinVersion, alterConfigVersion.minVersion)
@@ -114,20 +118,21 @@ class ApiVersionManagerTest {
@Test
def testEnvelopeDisabledForKRaftBroker(): Unit = {
- val forwardingManager = Mockito.mock(classOf[ForwardingManager])
- Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
+ val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
+ Mockito.when(nodeApiVersionsSupplier.get).thenReturn(Optional.empty())
val versionManager = new DefaultApiVersionManager(
- listenerType = ListenerType.BROKER,
- forwardingManager = forwardingManager,
- brokerFeatures = brokerFeatures,
- metadataCache = metadataCache,
- enableUnstableLastVersion = true
+ ListenerType.BROKER,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ true,
+ Optional.empty
)
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion))
- assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
+
assertFalse(ApiKeys.apisForListener(versionManager.listenerType()).contains(ApiKeys.ENVELOPE))
- val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs
= 0, false)
+ val apiVersionsResponse = versionManager.apiVersionResponse(0, false)
val envelopeVersion =
apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
assertNull(envelopeVersion)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2283ca36582..0057556161d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -86,7 +86,7 @@ import org.apache.kafka.metadata.{ConfigRepository,
MetadataCache, MockConfigRep
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
+import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult,
Authorizer}
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerConfigs, ServerLogConfigs}
@@ -179,13 +179,8 @@ class KafkaApisTest extends Logging {
overrideProperties.foreach( p => properties.put(p._1, p._2))
val config = new KafkaConfig(properties)
- val listenerType = ListenerType.BROKER
- val enabledApis = ApiKeys.apisForListener(listenerType).asScala
-
val apiVersionManager = new SimpleApiVersionManager(
- listenerType,
- enabledApis,
- BrokerFeatures.defaultSupportedFeatures(true),
+ ListenerType.BROKER,
true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(),
Collections.emptyMap[String, java.lang.Short], 0))
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index b4779a5fd3b..a1020201a32 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -30,7 +30,6 @@ import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
-import kafka.server.SimpleApiVersionManager;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.share.SharePartitionManager;
@@ -60,6 +59,7 @@ import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.ClientMetricsManager;
+import org.apache.kafka.server.SimpleApiVersionManager;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
diff --git
a/server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
b/server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
new file mode 100644
index 00000000000..4cc60150844
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.network.metrics.RequestChannelMetrics;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+/**
+ * ApiVersionManagers are used to define the APIs supported by servers
+ */
+public interface ApiVersionManager {
+
+ /**
+ * Whether to mark unstable API versions as enabled
+ * @return true if unstable API versions are enabled, otherwise false
+ */
+ boolean enableUnstableLastVersion();
+
+ /**
+ * The listener type
+ * @return Broker or Controller depending on the server's role
+ */
+ ApiMessageType.ListenerType listenerType();
+
+ /**
+ * The ApiVersionsResponse to send back to client when they send an
ApiVersionsRequest
+ * @param throttleTimeMs The throttle time in milliseconds
+ * @param alterFeatureLevel0 Whether to filter feature v0 in the response
+ * @return the ApiVersionsResponse to send back to the client
+ */
+ ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean
alterFeatureLevel0);
+
+ /**
+ * The features supported by the server
+ * @return the FinalizedFeatures
+ */
+ FinalizedFeatures features();
+
+ /**
+ * Whether the specified API and version is supported
+ * @param apiKey the API key
+ * @param apiVersion the API version
+ * @return true if the API key and version is supported, otherwise false
+ */
+ default boolean isApiEnabled(ApiKeys apiKey, short apiVersion) {
+ return apiKey != null && apiKey.inScope(listenerType()) &&
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion());
+ }
+
+ /**
+ * Create a new RequestChannelMetrics for the enabled APIs
+ * @return the RequestChannelMetrics
+ */
+ default RequestChannelMetrics newRequestMetrics() {
+ return new
RequestChannelMetrics(ApiKeys.apisForListener(listenerType()));
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
b/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
new file mode 100644
index 00000000000..9eb835ccfbf
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * The default ApiVersionManager that supports forwarding and has metadata
cache, used in brokers.
+ * The enabled APIs are determined by the broker listener type and the
controller APIs.
+ */
+public class DefaultApiVersionManager implements ApiVersionManager {
+
+ private final ApiMessageType.ListenerType listenerType;
+ private final Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier;
+ private final BrokerFeatures brokerFeatures;
+ private final MetadataCache metadataCache;
+ private final boolean enableUnstableLastVersion;
+ private final Optional<ClientMetricsManager> clientMetricsManager;
+
+ /**
+ * DefaultApiVersionManager constructor
+ * @param listenerType the listener type
+ * @param nodeApiVersionsSupplier the supplier of NodeApiVersions
+ * @param brokerFeatures the broker features
+ * @param metadataCache the metadata cache, used to get the finalized
features and the metadata version
+ * @param enableUnstableLastVersion whether to enable unstable last
version, see
+ * {@link
org.apache.kafka.server.config.ServerConfigs#UNSTABLE_API_VERSIONS_ENABLE_CONFIG}
+ * @param clientMetricsManager the client metrics manager, helps to
determine whether client telemetry is enabled
+ */
+ public DefaultApiVersionManager(
+ ApiMessageType.ListenerType listenerType,
+ Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier,
+ BrokerFeatures brokerFeatures,
+ MetadataCache metadataCache,
+ boolean enableUnstableLastVersion,
+ Optional<ClientMetricsManager> clientMetricsManager) {
+ this.listenerType = listenerType;
+ this.nodeApiVersionsSupplier = nodeApiVersionsSupplier;
+ this.brokerFeatures = brokerFeatures;
+ this.metadataCache = metadataCache;
+ this.enableUnstableLastVersion = enableUnstableLastVersion;
+ this.clientMetricsManager = clientMetricsManager;
+ }
+
+ @Override
+ public boolean enableUnstableLastVersion() {
+ return enableUnstableLastVersion;
+ }
+
+ @Override
+ public ApiMessageType.ListenerType listenerType() {
+ return listenerType;
+ }
+
+ @Override
+ public ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean
alterFeatureLevel0) {
+ FinalizedFeatures finalizedFeatures = metadataCache.features();
+ Optional<NodeApiVersions> controllerApiVersions =
nodeApiVersionsSupplier.get();
+ boolean clientTelemetryEnabled =
clientMetricsManager.map(ClientMetricsManager::isTelemetryReceiverConfigured).orElse(false);
+ ApiVersionsResponseData.ApiVersionCollection apiVersions =
controllerApiVersions
+ .map(nodeApiVersions ->
ApiVersionsResponse.controllerApiVersions(
+ nodeApiVersions,
+ listenerType,
+ enableUnstableLastVersion,
+ clientTelemetryEnabled))
+ .orElseGet(() -> ApiVersionsResponse.brokerApiVersions(
+ listenerType,
+ enableUnstableLastVersion,
+ clientTelemetryEnabled));
+
+ return new ApiVersionsResponse.Builder()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setApiVersions(apiVersions)
+ .setSupportedFeatures(brokerFeatures.supportedFeatures())
+ .setFinalizedFeatures(finalizedFeatures.finalizedFeatures())
+
.setFinalizedFeaturesEpoch(finalizedFeatures.finalizedFeaturesEpoch())
+ .setAlterFeatureLevel0(alterFeatureLevel0)
+ .build();
+ }
+
+ @Override
+ public FinalizedFeatures features() {
+ return metadataCache.features();
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
b/server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
new file mode 100644
index 00000000000..67d1ef4abe6
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+import java.util.function.Supplier;
+
+/**
+ * A simple ApiVersionManager used in controllers. It does not support
forwarding and does not have metadata cache.
+ * Its enabled APIs are determined by the listener type, its finalized
features are dynamically determined by the controller.
+ */
+public class SimpleApiVersionManager implements ApiVersionManager {
+
+ private final ApiMessageType.ListenerType listenerType;
+ private final Features<SupportedVersionRange> brokerFeatures;
+ private final boolean enableUnstableLastVersion;
+ private final Supplier<FinalizedFeatures> featuresProvider;
+ private final ApiVersionsResponseData.ApiVersionCollection apiVersions;
+
+ /**
+ * SimpleApiVersionManager constructor
+ * @param listenerType the listener type
+ * @param enableUnstableLastVersion whether to enable unstable last
version, see
+ * {@link
org.apache.kafka.server.config.ServerConfigs#UNSTABLE_API_VERSIONS_ENABLE_CONFIG}
+ * @param featuresProvider a provider to the finalized features supported
+ */
+ public SimpleApiVersionManager(ApiMessageType.ListenerType listenerType,
+ boolean enableUnstableLastVersion,
+ Supplier<FinalizedFeatures>
featuresProvider) {
+ this.listenerType = listenerType;
+ this.brokerFeatures =
BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion);
+ this.enableUnstableLastVersion = enableUnstableLastVersion;
+ this.featuresProvider = featuresProvider;
+ this.apiVersions = ApiVersionsResponse.collectApis(listenerType,
ApiKeys.apisForListener(listenerType), enableUnstableLastVersion);
+ }
+
+ @Override
+ public boolean enableUnstableLastVersion() {
+ return enableUnstableLastVersion;
+ }
+
+ @Override
+ public ApiMessageType.ListenerType listenerType() {
+ return listenerType;
+ }
+
+ @Override
+ public ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean
alterFeatureLevel0) {
+ FinalizedFeatures currentFeatures = features();
+ return new ApiVersionsResponse.Builder()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setApiVersions(apiVersions)
+ .setSupportedFeatures(brokerFeatures)
+ .setFinalizedFeatures(currentFeatures.finalizedFeatures())
+
.setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch())
+ .setAlterFeatureLevel0(alterFeatureLevel0)
+ .build();
+ }
+
+ @Override
+ public FinalizedFeatures features() {
+ return featuresProvider.get();
+ }
+}