This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cd3c0ab1a3d KAFKA-15060: fix the ApiVersionManager interface
cd3c0ab1a3d is described below

commit cd3c0ab1a3de0e02f473edeeb986acff3fc87230
Author: Colin P. McCabe <cmcc...@apache.org>
AuthorDate: Wed Jun 7 13:11:55 2023 -0700

    KAFKA-15060: fix the ApiVersionManager interface
    
    This PR expands the scope of ApiVersionManager a bit to include returning 
the current
    MetadataVersion and features that are in effect. This is useful in general 
because that information
    needs to be returned in an ApiVersionsResponse. It also allows us to fix 
the ApiVersionManager
    interface so that all subclasses implement all methods of the interface. 
Having subclasses that
    don't implement some methods is dangerous because they could cause 
exceptions at runtime in
    unexpected scenarios.
    
    On the KRaft controller, we were previously performing a read operation in 
the QuorumController
    thread to get the current metadata version and features. With this PR, we 
now read a volatile
    variable maintained by a separate MetadataVersionContextPublisher object. 
This will improve
    performance and simplify the code. It should not change the guarantees we 
are providing; in both
    the old and new scenarios, we need to be robust against version skew 
scenarios during updates.
    
    Add a Features class which just has a 3-tuple of metadata version, 
features, and feature epoch.
    Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates 
the Features class.
    (There are some additional feature-related classes that can be consolidated 
in in a follow-on PR.)
    
    Create a java class, EndpointReadyFutures, for managing the futures 
associated with individual
    authorizer endpoints. This avoids code duplication between ControllerServer 
and BrokerServer and
    makes this code unit-testable.
    
    Reviewers: David Arthur <mum...@gmail.com>, dengziming 
<dengziming1...@gmail.com>, Luke Chen <show...@gmail.com>
---
 checkstyle/import-control-server-common.xml        |   4 +
 .../scala/kafka/controller/KafkaController.scala   |   8 +-
 .../scala/kafka/server/ApiVersionManager.scala     |  55 +++---
 .../src/main/scala/kafka/server/BrokerServer.scala |  28 ++-
 .../main/scala/kafka/server/ControllerApis.scala   |  15 +-
 .../main/scala/kafka/server/ControllerServer.scala |  31 ++-
 .../main/scala/kafka/server/MetadataCache.scala    |  13 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |  18 +-
 .../kafka/server/metadata/ZkMetadataCache.scala    |  47 +++--
 .../scala/kafka/tools/TestRaftRequestHandler.scala |   2 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |   7 +-
 .../unit/kafka/network/SocketServerTest.scala      |   6 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  |  12 +-
 .../unit/kafka/server/ControllerApisTest.scala     |  10 +-
 .../kafka/server/FinalizedFeatureCacheTest.scala   |  20 +-
 .../FinalizedFeatureChangeListenerTest.scala       |  21 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  10 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   8 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   7 +-
 .../metadata/publisher/FeaturesPublisher.java      |  54 +++++
 .../org/apache/kafka/server/common/Features.java   |  87 ++++++++
 .../kafka/server/network/EndpointReadyFutures.java | 219 +++++++++++++++++++++
 .../server/network/KafkaAuthorizerServerInfo.java  | 108 ++++++++++
 .../apache/kafka/server/common/FeaturesTest.java   |  50 +++++
 .../server/network/EndpointReadyFuturesTest.java   | 169 ++++++++++++++++
 25 files changed, 857 insertions(+), 152 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index d310d81a832..f238bc9b8d9 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -77,6 +77,10 @@
         <subpackage name="metrics">
             <allow pkg="com.yammer.metrics" />
         </subpackage>
+
+        <subpackage name="network">
+            <allow pkg="org.apache.kafka.server.authorizer" />
+        </subpackage>
     </subpackage>
 
 </import-control>
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index ee98123059a..c3f76e83d12 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1598,7 +1598,9 @@ class KafkaController(val config: KafkaConfig,
         !config.isFeatureVersioningSupported ||
         !featureCache.getFeatureOption.exists(
           latestFinalizedFeatures =>
-            BrokerFeatures.hasIncompatibleFeatures(broker.features, 
latestFinalizedFeatures.features))
+            BrokerFeatures.hasIncompatibleFeatures(broker.features,
+              latestFinalizedFeatures.finalizedFeatures().asScala.
+                map(kv => (kv._1, kv._2.toShort)).toMap))
     }
   }
 
@@ -2081,8 +2083,8 @@ class KafkaController(val config: KafkaConfig,
                                                         callback: 
UpdateFeaturesCallback): Unit = {
     val updates = request.featureUpdates
     val existingFeatures = featureCache.getFeatureOption
-      .map(featuresAndEpoch => featuresAndEpoch.features)
-      .getOrElse(Map[String, Short]())
+      .map(featuresAndEpoch => 
featuresAndEpoch.finalizedFeatures().asScala.map(kv => (kv._1, 
kv._2.toShort)).toMap)
+    .getOrElse(Map[String, Short]())
     // A map with key being feature name and value being finalized version.
     // This contains the target features to be eventually written to 
FeatureZNode.
     val targetFeatures = scala.collection.mutable.Map[String, Short]() ++ 
existingFeatures
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala 
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index c3346a5edb2..fa796bc6688 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -18,10 +18,11 @@ package kafka.server
 
 import kafka.network
 import kafka.network.RequestChannel
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.feature.SupportedVersionRange
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.ApiVersionsResponse
+import org.apache.kafka.server.common.Features
 
 import scala.jdk.CollectionConverters._
 
@@ -30,22 +31,14 @@ trait ApiVersionManager {
   def listenerType: ListenerType
   def enabledApis: collection.Set[ApiKeys]
 
-  /**
-   * @see [[DefaultApiVersionManager.apiVersionResponse]]
-   * @see [[kafka.server.KafkaApis.handleApiVersionsRequest]]
-   */
   def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
 
-  /**
-   * @see [[SimpleApiVersionManager.apiVersionResponse]]
-   * @see [[kafka.server.ControllerApis.handleApiVersionsRequest]]
-   */
-  def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, 
java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse
-
   def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
     apiKey != null && apiKey.inScope(listenerType) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
   }
   def newRequestMetrics: RequestChannel.Metrics = new 
network.RequestChannel.Metrics(enabledApis)
+
+  def features: Features
 }
 
 object ApiVersionManager {
@@ -80,41 +73,43 @@ object ApiVersionManager {
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
   val enabledApis: collection.Set[ApiKeys],
-  brokerFeatures: Features[SupportedVersionRange],
+  brokerFeatures: 
org.apache.kafka.common.feature.Features[SupportedVersionRange],
   val enableUnstableLastVersion: Boolean,
-  val zkMigrationEnabled: Boolean
+  val zkMigrationEnabled: Boolean,
+  val featuresProvider: () => Features
 ) extends ApiVersionManager {
 
   def this(
     listenerType: ListenerType,
     enableUnstableLastVersion: Boolean,
-    zkMigrationEnabled: Boolean
+    zkMigrationEnabled: Boolean,
+    featuresProvider: () => Features
   ) = {
     this(
       listenerType,
       ApiKeys.apisForListener(listenerType).asScala,
       BrokerFeatures.defaultSupportedFeatures(),
       enableUnstableLastVersion,
-      zkMigrationEnabled
+      zkMigrationEnabled,
+      featuresProvider
     )
   }
 
   private val apiVersions = 
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
-  override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse 
= {
-    throw new UnsupportedOperationException("This method is not supported in 
SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, 
finalizedFeatures, epoch) instead")
-  }
-
-  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): 
ApiVersionsResponse = {
+  override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
+    val currentFeatures = features
     ApiVersionsResponse.createApiVersionsResponse(
       throttleTimeMs,
       apiVersions,
       brokerFeatures,
-      finalizedFeatures.asJava,
-      finalizedFeaturesEpoch,
+      currentFeatures.finalizedFeatures(),
+      currentFeatures.finalizedFeaturesEpoch(),
       zkMigrationEnabled
     )
   }
+
+  override def features: Features = featuresProvider.apply()
 }
 
 /**
@@ -124,7 +119,7 @@ class SimpleApiVersionManager(
  *
  * @param listenerType the listener type
  * @param forwardingManager the forwarding manager,
- * @param features the broker features
+ * @param brokerFeatures the broker features
  * @param metadataCache the metadata cache, used to get the finalized features 
and the metadata version
  * @param enableUnstableLastVersion whether to enable unstable last version, 
see [[KafkaConfig.unstableApiVersionsEnabled]]
  * @param zkMigrationEnabled whether to enable zk migration, see 
[[KafkaConfig.migrationEnabled]]
@@ -132,7 +127,7 @@ class SimpleApiVersionManager(
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
   forwardingManager: Option[ForwardingManager],
-  features: BrokerFeatures,
+  brokerFeatures: BrokerFeatures,
   metadataCache: MetadataCache,
   val enableUnstableLastVersion: Boolean,
   val zkMigrationEnabled: Boolean = false
@@ -141,16 +136,16 @@ class DefaultApiVersionManager(
   val enabledApis = ApiKeys.apisForListener(listenerType).asScala
 
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
-    val supportedFeatures = features.supportedFeatures
+    val supportedFeatures = brokerFeatures.supportedFeatures
     val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = 
forwardingManager.flatMap(_.controllerApiVersions)
 
     ApiVersionsResponse.createApiVersionsResponse(
       throttleTimeMs,
-      metadataCache.metadataVersion().highestSupportedRecordVersion,
+      finalizedFeatures.metadataVersion().highestSupportedRecordVersion,
       supportedFeatures,
-      finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
-      finalizedFeatures.epoch,
+      finalizedFeatures.finalizedFeatures(),
+      finalizedFeatures.finalizedFeaturesEpoch(),
       controllerApiVersions.orNull,
       listenerType,
       enableUnstableLastVersion,
@@ -158,7 +153,5 @@ class DefaultApiVersionManager(
     )
   }
 
-  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse 
= {
-    throw new UnsupportedOperationException("This method is not supported in 
DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")
-  }
+  override def features: Features = metadataCache.features()
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 22e14e96b0a..2bf29c32d97 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.cluster.Broker.ServerInfo
 import kafka.cluster.EndPoint
 import kafka.coordinator.group.GroupCoordinatorAdapter
 import kafka.coordinator.transaction.{ProducerIdManager, 
TransactionCoordinator}
@@ -48,6 +47,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 
@@ -366,25 +366,10 @@ class BrokerServer(
           config.interBrokerListenerName.value() + ". Found listener(s): " +
           endpoints.asScala.map(ep => 
ep.listenerName().orElse("(none)")).mkString(", "))
       }
-      val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
-        config.nodeId,
-        endpoints,
-        interBrokerListener,
-        config.earlyStartListeners.map(_.value()).asJava)
 
       // Create and initialize an authorizer if one is configured.
       authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
-      val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
authorizer match {
-        case Some(authZ) =>
-          authZ.start(authorizerInfo).asScala.map { case (ep, cs) =>
-            ep -> cs.toCompletableFuture
-          }
-        case None =>
-          authorizerInfo.endpoints.asScala.map { ep =>
-            ep -> CompletableFuture.completedFuture[Void](null)
-          }.toMap
-      }
 
       val fetchManager = new FetchManager(Time.SYSTEM,
         new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
@@ -504,6 +489,17 @@ class BrokerServer(
 
       // Enable inbound TCP connections. Each endpoint will be started only 
once its matching
       // authorizer future is completed.
+      val endpointReadyFutures = {
+        val builder = new EndpointReadyFutures.Builder()
+        builder.build(authorizer.asJava,
+          new KafkaAuthorizerServerInfo(
+            new ClusterResource(clusterId),
+            config.nodeId,
+            endpoints,
+            interBrokerListener,
+            config.earlyStartListeners.map(_.value()).asJava))
+      }
+      val authorizerFutures = endpointReadyFutures.futures().asScala.toMap
       val enableRequestProcessingFuture = 
socketServer.enableRequestProcessing(authorizerFutures)
 
       // Block here until all the authorizer futures are complete.
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index ba3da5c47c7..1be517e25d7 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
     if (apiVersionRequest.hasUnsupportedRequestVersion) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
UNSUPPORTED_VERSION.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else if (!apiVersionRequest.isValid) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
INVALID_REQUEST.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, 
result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => 
apiVersionManager.apiVersionResponse(requestThrottleMs))
     }
+    CompletableFuture.completedFuture[Unit](())
   }
 
   def authorizeAlterResource(requestContext: RequestContext,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index b45445e1b0d..36d8f5eca1e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.cluster.Broker.ServerInfo
 import kafka.metrics.LinuxIoMetricsCollector
 import kafka.migration.MigrationPropagator
 import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -43,10 +42,12 @@ import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, 
LegacyPropagator}
+import org.apache.kafka.metadata.publisher.FeaturesPublisher
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
+import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.util.{Deadline, FutureUtils}
 
@@ -115,6 +116,7 @@ class ControllerServer(
   var migrationSupport: Option[ControllerMigrationSupport] = None
   def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
   val metadataPublishers: util.List[MetadataPublisher] = new 
util.ArrayList[MetadataPublisher]()
+  val featuresPublisher = new FeaturesPublisher()
 
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): 
Boolean = {
     lock.lock()
@@ -153,30 +155,22 @@ class ControllerServer(
       authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
 
-      val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
authorizer match {
-        case Some(authZ) =>
-          // It would be nice to remove some of the broker-specific 
assumptions from
-          // AuthorizerServerInfo, such as the assumption that there is an 
inter-broker
-          // listener, or that ID is named brokerId.
-          val controllerAuthorizerInfo = ServerInfo(
+      val endpointReadyFutures = {
+        val builder = new EndpointReadyFutures.Builder()
+        builder.build(authorizer.asJava,
+          new KafkaAuthorizerServerInfo(
             new ClusterResource(clusterId),
             config.nodeId,
             javaListeners,
             javaListeners.get(0),
-            config.earlyStartListeners.map(_.value()).asJava)
-          authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
-            ep -> cs.toCompletableFuture
-          }.toMap
-        case None =>
-          javaListeners.asScala.map {
-            ep => ep -> CompletableFuture.completedFuture[Void](null)
-          }.toMap
+            config.earlyStartListeners.map(_.value()).asJava))
       }
 
       val apiVersionManager = new SimpleApiVersionManager(
         ListenerType.CONTROLLER,
         config.unstableApiVersionsEnabled,
-        config.migrationEnabled
+        config.migrationEnabled,
+        () => featuresPublisher.features()
       )
 
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
@@ -302,6 +296,8 @@ class ControllerServer(
         s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
         DataPlaneAcceptor.ThreadPrefix)
 
+      val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
endpointReadyFutures.futures().asScala.toMap
+
       /**
        * Enable the controller endpoint(s). If we are using an authorizer 
which stores
        * ACLs in the metadata log, such as StandardAuthorizer, we will be able 
to start
@@ -326,6 +322,9 @@ class ControllerServer(
       // register this instance for dynamic config changes to the KafkaConfig
       config.dynamicConfig.addReconfigurables(this)
 
+      // Set up the metadata features publisher.
+      metadataPublishers.add(featuresPublisher)
+
       // Set up the dynamic config publisher. This runs even in combined mode, 
since the broker
       // has its own separate dynamic configuration object.
       metadataPublishers.add(new DynamicConfigPublisher(
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 342b23cec4b..ecd64c17b91 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -22,16 +22,10 @@ import kafka.server.metadata.{KRaftMetadataCache, 
ZkMetadataCache}
 import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 
 import java.util
 
-case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: 
Long) {
-  override def toString(): String = {
-    s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)"
-  }
-}
-
 /**
  * Used to represent the controller id cached in the metadata cache of the 
broker. This trait is
  * extended to represent if the controller is KRaft controller or Zk 
controller.
@@ -44,7 +38,6 @@ case class ZkCachedControllerId(id: Int) extends 
CachedControllerId
 case class KRaftCachedControllerId(id: Int) extends CachedControllerId
 
 trait MetadataCache {
-
   /**
    * Return topic metadata for a given set of topics and listener. See 
KafkaApis#handleTopicMetadataRequest for details
    * on the use of the two boolean flags.
@@ -113,9 +106,9 @@ trait MetadataCache {
 
   def metadataVersion(): MetadataVersion
 
-  def features(): FinalizedFeaturesAndEpoch
-
   def getRandomAliveBrokerId: Option[Int]
+
+  def features(): Features
 }
 
 object MetadataCache {
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 300bb3e5cab..5e3ed11022b 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -18,7 +18,7 @@
 package kafka.server.metadata
 
 import kafka.controller.StateChangeLogger
-import kafka.server.{CachedControllerId, FinalizedFeaturesAndEpoch, 
KRaftCachedControllerId, MetadataCache}
+import kafka.server.{CachedControllerId, KRaftCachedControllerId, 
MetadataCache}
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
@@ -37,7 +37,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, 
DescribeClientQuotasResponseData}
 import 
org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, 
DescribeUserScramCredentialsResponseData}
 import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 
 import scala.collection.{Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
@@ -393,15 +393,11 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
   override def metadataVersion(): MetadataVersion = 
_currentImage.features().metadataVersion()
 
-  override def features(): FinalizedFeaturesAndEpoch = {
+  override def features(): Features = {
     val image = _currentImage
-    val features = image.features().finalizedVersions().asScala.map {
-      case (name: String, level: java.lang.Short) => name -> Short2short(level)
-    }
-    features.put(MetadataVersion.FEATURE_NAME, 
image.features().metadataVersion().featureLevel())
-
-    FinalizedFeaturesAndEpoch(
-      features.toMap,
-      image.highestOffsetAndEpoch().offset)
+    new Features(image.features().metadataVersion(),
+      image.features().finalizedVersions(),
+      image.highestOffsetAndEpoch().offset,
+      true)
   }
 }
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index feaaf1c43f1..9159b791880 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
 import kafka.controller.StateChangeLogger
-import kafka.server.{BrokerFeatures, CachedControllerId, 
FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache, 
ZkCachedControllerId}
+import kafka.server.{BrokerFeatures, CachedControllerId, 
KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import kafka.utils.Implicits._
@@ -40,7 +40,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiVersionsResponse, 
MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 
 import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
 import scala.concurrent.TimeoutException
@@ -53,7 +53,7 @@ class FeatureCacheUpdateException(message: String) extends 
RuntimeException(mess
 trait ZkFinalizedFeatureCache {
   def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): 
Unit
 
-  def getFeatureOption: Option[FinalizedFeaturesAndEpoch]
+  def getFeatureOption: Option[Features]
 }
 
 /**
@@ -83,7 +83,7 @@ class ZkMetadataCache(
   private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
 
   // Features are updated via ZK notification (see 
FinalizedFeatureChangeListener)
-  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+  @volatile private var _features: Option[Features] = Option.empty
   private val featureLock = new ReentrantLock()
   private val featureCond = featureLock.newCondition()
 
@@ -488,11 +488,12 @@ class ZkMetadataCache(
 
   override def metadataVersion(): MetadataVersion = metadataVersion
 
-  override def features(): FinalizedFeaturesAndEpoch = {
-    featuresAndEpoch match {
-      case Some(features) => features
-      case None => FinalizedFeaturesAndEpoch(Map.empty, 
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH)
-    }
+  override def features(): Features = _features match {
+    case Some(features) => features
+    case None => new Features(metadataVersion,
+      Collections.emptyMap(),
+      ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+      false)
   }
 
   /**
@@ -509,14 +510,18 @@ class ZkMetadataCache(
    *                         not modified.
    */
   def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: 
Long): Unit = {
-    val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
-    val existing = featuresAndEpoch.map(item => 
item.toString()).getOrElse("<empty>")
-    if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > 
latest.epoch) {
+    val latest = new Features(metadataVersion,
+      latestFeatures.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
+      latestEpoch,
+      false)
+    val existing = _features
+    if (existing.isDefined && existing.get.finalizedFeaturesEpoch() > 
latest.finalizedFeaturesEpoch()) {
       val errorMsg = s"FinalizedFeatureCache update failed due to invalid 
epoch in new $latest." +
         s" The existing cache contents are $existing."
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
-      val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
+      val incompatibleFeatures = brokerFeatures.incompatibleFeatures(
+        latest.finalizedFeatures().asScala.map(kv => (kv._1, 
kv._2.toShort)).toMap)
       if (incompatibleFeatures.nonEmpty) {
         val errorMsg = "FinalizedFeatureCache update failed since feature 
compatibility" +
           s" checks failed! Supported ${brokerFeatures.supportedFeatures} has 
incompatibilities" +
@@ -525,7 +530,7 @@ class ZkMetadataCache(
       } else {
         val logMsg = s"Updated cache from existing $existing to latest 
$latest."
         inLock(featureLock) {
-          featuresAndEpoch = Some(latest)
+          _features = Some(latest)
           featureCond.signalAll()
         }
         info(logMsg)
@@ -533,13 +538,12 @@ class ZkMetadataCache(
     }
   }
 
-
   /**
    * Clears all existing finalized features and epoch from the cache.
    */
   def clearFeatures(): Unit = {
     inLock(featureLock) {
-      featuresAndEpoch = None
+      _features = None
       featureCond.signalAll()
     }
   }
@@ -565,12 +569,12 @@ class ZkMetadataCache(
     }
     val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1000000)
     inLock(featureLock) {
-      while (!(featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch >= 
minExpectedEpoch)) {
+      while (!(_features.isDefined && _features.get.finalizedFeaturesEpoch() 
>= minExpectedEpoch)) {
         val nowNanos = System.nanoTime()
         if (nowNanos > waitEndTimeNanos) {
           throw new TimeoutException(
             s"Timed out after waiting for ${timeoutMs}ms for required 
condition to be met." +
-              s" Current epoch: ${featuresAndEpoch.map(fe => 
fe.epoch).getOrElse("<none>")}.")
+              s" Current epoch: ${_features.map(fe => 
fe.finalizedFeaturesEpoch()).getOrElse("<none>")}.")
         }
         val sleepTimeMs = max(1L, (waitEndTimeNanos - nowNanos) / 1000000)
         featureCond.await(sleepTimeMs, TimeUnit.MILLISECONDS)
@@ -578,10 +582,5 @@ class ZkMetadataCache(
     }
   }
 
-  /**
-   * @return   the latest known FinalizedFeaturesAndEpoch or empty if not 
defined in the cache.
-   */
-  def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = {
-    featuresAndEpoch
-  }
+  override def getFeatureOption: Option[Features] = _features
 }
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala 
b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index b379efeb858..a9b471b1622 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -64,7 +64,7 @@ class TestRaftRequestHandler(
   }
 
   private def handleApiVersions(request: RequestChannel.Request): Unit = {
-    requestChannel.sendResponse(request, 
apiVersionManager.apiVersionResponse(throttleTimeMs = 0, Map.empty, -1), None)
+    requestChannel.sendResponse(request, 
apiVersionManager.apiVersionResponse(throttleTimeMs = 0), None)
   }
 
   private def handleVote(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index be45bae578d..1026c528473 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
 import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, 
RaftConfig}
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils, 
ShutdownableThread}
@@ -74,7 +75,11 @@ class TestRaftServer(
     tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
     credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, 
tokenCache)
 
-    val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
+    val apiVersionManager = new SimpleApiVersionManager(
+      ListenerType.CONTROLLER,
+      true,
+      false,
+      () => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
     socketServer = new SocketServer(config, metrics, time, credentialProvider, 
apiVersionManager)
 
     val metaProperties = MetaProperties(
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 5e9e84fd465..58ba9d3af7f 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -24,7 +24,7 @@ import java.nio.channels.{SelectionKey, SocketChannel}
 import java.nio.charset.StandardCharsets
 import java.util
 import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, 
ExecutionException, Executors, TimeUnit}
-import java.util.{Properties, Random}
+import java.util.{Collections, Properties, Random}
 import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, 
TextNode}
 import com.yammer.metrics.core.{Gauge, Meter}
 
@@ -46,6 +46,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, MockTime, 
Time, Utils}
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.apache.log4j.Level
 import org.junit.jupiter.api.Assertions._
@@ -77,7 +78,8 @@ class SocketServerTest {
   // Clean-up any metrics left around by previous tests
   TestUtils.clearYammerMetrics()
 
-  private val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
+  private val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.BROKER, true, false,
+    () => new Features(MetadataVersion.latest(), Collections.emptyMap[String, 
java.lang.Short], 0, true))
   val server = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
   server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
   val sockets = new ArrayBuffer[Socket]
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index bcc84443f16..6c9c3a70ca6 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -39,7 +39,7 @@ class ApiVersionManagerTest {
     val versionManager = new DefaultApiVersionManager(
       listenerType = apiScope,
       forwardingManager = None,
-      features = brokerFeatures,
+      brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
     )
@@ -57,7 +57,7 @@ class ApiVersionManagerTest {
     val versionManager = new DefaultApiVersionManager(
       listenerType = apiScope,
       forwardingManager = None,
-      features = brokerFeatures,
+      brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = false
     )
@@ -86,7 +86,7 @@ class ApiVersionManagerTest {
     val versionManager = new DefaultApiVersionManager(
       listenerType = ListenerType.ZK_BROKER,
       forwardingManager = Some(forwardingManager),
-      features = brokerFeatures,
+      brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
     )
@@ -107,7 +107,7 @@ class ApiVersionManagerTest {
       val versionManager = new DefaultApiVersionManager(
         listenerType = ListenerType.BROKER,
         forwardingManager = forwardingManagerOpt,
-        features = brokerFeatures,
+        brokerFeatures = brokerFeatures,
         metadataCache = metadataCache,
         enableUnstableLastVersion = true
       )
@@ -129,7 +129,7 @@ class ApiVersionManagerTest {
     val versionManager = new DefaultApiVersionManager(
       listenerType = ListenerType.ZK_BROKER,
       forwardingManager = Some(forwardingManager),
-      features = brokerFeatures,
+      brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
     )
@@ -148,7 +148,7 @@ class ApiVersionManagerTest {
     val versionManager = new DefaultApiVersionManager(
       listenerType = ListenerType.ZK_BROKER,
       forwardingManager = None,
-      features = brokerFeatures,
+      brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
       enableUnstableLastVersion = true
     )
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 5243a21f1be..bd2a306719b 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -46,12 +46,12 @@ import org.apache.kafka.common.protocol.{ApiKeys, 
ApiMessage, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.{ElectionType, Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 import org.apache.kafka.controller.{Controller, ControllerRequestContext, 
ResultOrError}
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult, Authorizer}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, 
MetadataVersion, ProducerIdsBlock}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -155,7 +155,11 @@ class ControllerApisTest {
       new KafkaConfig(props),
       MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
       Seq.empty,
-      new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
+      new SimpleApiVersionManager(
+        ListenerType.CONTROLLER,
+        true,
+        false,
+        () => Features.fromKRaftVersion(MetadataVersion.latest()))
     )
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
index 5eb562fb29f..b94bfb16c7e 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
@@ -32,6 +32,10 @@ class FinalizedFeatureCacheTest {
     assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, 
BrokerFeatures.createDefault()).getFeatureOption.isEmpty)
   }
 
+  def asJava(input: Map[String, Short]): java.util.Map[String, 
java.lang.Short] = {
+    input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava
+  }
+
   @Test
   def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
     val supportedFeatures = Map[String, SupportedVersionRange](
@@ -44,15 +48,15 @@ class FinalizedFeatureCacheTest {
     val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, 
brokerFeatures)
     cache.updateFeaturesOrThrow(finalizedFeatures, 10)
     assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
-    assertEquals(10, cache.getFeatureOption.get.epoch)
+    assertEquals(asJava(finalizedFeatures), 
cache.getFeatureOption.get.finalizedFeatures())
+    assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch())
 
     assertThrows(classOf[FeatureCacheUpdateException], () => 
cache.updateFeaturesOrThrow(finalizedFeatures, 9))
 
     // Check that the failed updateOrThrow call did not make any mutations.
     assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
-    assertEquals(10, cache.getFeatureOption.get.epoch)
+    assertEquals(asJava(finalizedFeatures), 
cache.getFeatureOption.get.finalizedFeatures())
+    assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch())
   }
 
   @Test
@@ -83,8 +87,8 @@ class FinalizedFeatureCacheTest {
     val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, 
brokerFeatures)
     cache.updateFeaturesOrThrow(finalizedFeatures, 12)
     assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(finalizedFeatures,  cache.getFeatureOption.get.features)
-    assertEquals(12, cache.getFeatureOption.get.epoch)
+    assertEquals(asJava(finalizedFeatures),  
cache.getFeatureOption.get.finalizedFeatures())
+    assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch())
   }
 
   @Test
@@ -99,8 +103,8 @@ class FinalizedFeatureCacheTest {
     val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, 
brokerFeatures)
     cache.updateFeaturesOrThrow(finalizedFeatures, 12)
     assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
-    assertEquals(12, cache.getFeatureOption.get.epoch)
+    assertEquals(asJava(finalizedFeatures), 
cache.getFeatureOption.get.finalizedFeatures())
+    assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch())
 
     cache.clearFeatures()
     assertTrue(cache.getFeatureOption.isEmpty)
diff --git 
a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
 
b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
index 67313ba3c26..844cd528c43 100644
--- 
a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
@@ -21,6 +21,7 @@ import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.TestUtils
 import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
 import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.server.common.{Features => JFeatures}
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
@@ -32,6 +33,15 @@ import java.util.concurrent.{CountDownLatch, 
TimeoutException}
 import scala.jdk.CollectionConverters._
 
 class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
+  case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: 
Long) {
+    override def toString(): String = {
+      s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)"
+    }
+  }
+
+  def asJava(input: Map[String, Short]): java.util.Map[String, 
java.lang.Short] = {
+    input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava
+  }
 
   private def createBrokerFeatures(): BrokerFeatures = {
     val supportedFeaturesMap = Map[String, SupportedVersionRange](
@@ -64,8 +74,8 @@ class FinalizedFeatureChangeListenerTest extends 
QuorumTestHarness {
       val mayBeNewCacheContent = cache.getFeatureOption
       assertFalse(mayBeNewCacheContent.isEmpty)
       val newCacheContent = mayBeNewCacheContent.get
-      assertEquals(expectedCacheContent.get.features, newCacheContent.features)
-      assertEquals(expectedCacheContent.get.epoch, newCacheContent.epoch)
+      assertEquals(asJava(expectedCacheContent.get.features), 
newCacheContent.finalizedFeatures())
+      assertEquals(expectedCacheContent.get.epoch, 
newCacheContent.finalizedFeaturesEpoch())
     } else {
       val mayBeNewCacheContent = cache.getFeatureOption
       assertTrue(mayBeNewCacheContent.isEmpty)
@@ -94,7 +104,9 @@ class FinalizedFeatureChangeListenerTest extends 
QuorumTestHarness {
       assertTrue(updatedVersion > initialFinalizedFeatures.epoch)
 
       cache.waitUntilFeatureEpochOrThrow(updatedVersion, 
JTestUtils.DEFAULT_MAX_WAIT_MS)
-      assertEquals(FinalizedFeaturesAndEpoch(finalizedFeatures, 
updatedVersion), cache.getFeatureOption.get)
+      assertEquals(new JFeatures(MetadataVersion.IBP_2_8_IV1,
+        asJava(finalizedFeatures), updatedVersion, false),
+          cache.getFeatureOption.get)
       assertTrue(listener.isListenerInitiated)
     }
 
@@ -248,7 +260,8 @@ class FinalizedFeatureChangeListenerTest extends 
QuorumTestHarness {
         listener.isListenerDead &&
         // Make sure the cache contents are as expected, and, the incompatible 
features were not
         // applied.
-        cache.getFeatureOption.get.equals(initialFinalizedFeatures)
+          cache.getFeatureOption.get.equals(new 
JFeatures(MetadataVersion.IBP_2_8_IV1,
+            asJava(initialFinalizedFeatures.features), 
initialFinalizedFeatures.epoch, false))
       }, "Timed out waiting for listener death and FinalizedFeatureCache to be 
updated")
     } finally {
       Exit.resetExitProcedure()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 40023dd9859..fd95b74ad9f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -93,7 +93,7 @@ import 
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
 OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
OffsetDeleteResponseTopicCollection}
 import org.apache.kafka.coordinator.group.GroupCoordinator
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, 
IBP_2_2_IV1}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, 
FetchPartitionData}
@@ -185,7 +185,13 @@ class KafkaApisTest {
     } else {
       ApiKeys.apisForListener(listenerType).asScala.toSet
     }
-    val apiVersionManager = new SimpleApiVersionManager(listenerType, 
enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)
+    val apiVersionManager = new SimpleApiVersionManager(
+      listenerType,
+      enabledApis,
+      BrokerFeatures.defaultSupportedFeatures(),
+      true,
+      false,
+      () => new Features(MetadataVersion.latest(), 
Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
 
     new KafkaApis(
       requestChannel = requestChannel,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 25e182b895c..13409a57044 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -59,6 +59,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -197,7 +199,11 @@ public class KRaftMetadataRequestBenchmark {
                 setClusterId("clusterId").
                 setTime(Time.SYSTEM).
                 setTokenManager(null).
-                setApiVersionManager(new 
SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)).
+                setApiVersionManager(new SimpleApiVersionManager(
+                        ApiMessageType.ListenerType.BROKER,
+                        false,
+                        false,
+                        () -> 
Features.fromKRaftVersion(MetadataVersion.latest()))).
                 build();
     }
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 1685105e9d1..5f0bcec62f0 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -60,6 +60,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -199,7 +200,11 @@ public class MetadataRequestBenchmark {
             setClusterId("clusterId").
             setTime(Time.SYSTEM).
             setTokenManager(null).
-            setApiVersionManager(new 
SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)).
+            setApiVersionManager(new SimpleApiVersionManager(
+                    ApiMessageType.ListenerType.ZK_BROKER,
+                    false,
+                    false,
+                    () -> 
Features.fromKRaftVersion(MetadataVersion.latest()))).
             build();
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
new file mode 100644
index 00000000000..8be90ec87f5
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.Features;
+
+import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
+
+
+public class FeaturesPublisher implements MetadataPublisher {
+    private volatile Features features = 
Features.fromKRaftVersion(MINIMUM_KRAFT_VERSION);
+
+    public Features features() {
+        return features;
+    }
+
+    @Override
+    public String name() {
+        return "FeaturesPublisher";
+    }
+
+    @Override
+    public void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
+    ) {
+        if (delta.featuresDelta() != null) {
+            features = new Features(newImage.features().metadataVersion(),
+                    newImage.features().finalizedVersions(),
+                    newImage.provenance().lastContainedOffset(),
+                    true);
+        }
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Features.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
new file mode 100644
index 00000000000..ba411048063
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class Features {
+    private final MetadataVersion version;
+    private final Map<String, Short> finalizedFeatures;
+    private final long finalizedFeaturesEpoch;
+
+    public static Features fromKRaftVersion(MetadataVersion version) {
+        return new Features(version, Collections.emptyMap(), -1, true);
+    }
+
+    public Features(
+        MetadataVersion version,
+        Map<String, Short> finalizedFeatures,
+        long finalizedFeaturesEpoch,
+        boolean kraftMode
+    ) {
+        this.version = version;
+        this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        // In KRaft mode, we always include the metadata version in the 
features map.
+        // In ZK mode, we never include it.
+        if (kraftMode) {
+            this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
+        } else {
+            this.finalizedFeatures.remove(FEATURE_NAME);
+        }
+    }
+
+    public MetadataVersion metadataVersion() {
+        return version;
+    }
+
+    public Map<String, Short> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    public long finalizedFeaturesEpoch() {
+        return finalizedFeaturesEpoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o.getClass().equals(Features.class))) return false;
+        Features other = (Features) o;
+        return version == other.version &&
+            finalizedFeatures.equals(other.finalizedFeatures) &&
+                finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(version, finalizedFeatures, 
finalizedFeaturesEpoch);
+    }
+
+    @Override
+    public String toString() {
+        return "Features" +
+                "(version=" + version +
+                ", finalizedFeatures=" + finalizedFeatures +
+                ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
+                ")";
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
 
b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
new file mode 100644
index 00000000000..1841cba9f19
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.network;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * Manages a set of per-endpoint futures.
+ */
+public class EndpointReadyFutures {
+    public static class Builder {
+        private LogContext logContext = null;
+        private final Map<Endpoint, List<EndpointCompletionStage>> 
endpointStages = new HashMap<>();
+        private final List<EndpointCompletionStage> stages = new ArrayList<>();
+
+        /**
+         * Add a readiness future that will block all endpoints.
+         *
+         * @param name          The future name.
+         * @param future        The future object.
+         *
+         * @return              This builder object.
+         */
+        public Builder addReadinessFuture(
+            String name,
+            CompletableFuture<?> future
+        ) {
+            stages.add(new EndpointCompletionStage(name, future));
+            return this;
+        }
+
+        /**
+         * Add readiness futures for individual endpoints.
+         *
+         * @param name          The future name.
+         * @param newFutures    A map from endpoints to futures.
+         *
+         * @return              This builder object.
+         */
+        public Builder addReadinessFutures(
+            String name,
+            Map<Endpoint, ? extends CompletionStage<?>> newFutures
+        ) {
+            newFutures.forEach((endpoint, future) -> {
+                endpointStages.computeIfAbsent(endpoint, __ -> new 
ArrayList<>()).
+                    add(new EndpointCompletionStage(name, future));
+            });
+            return this;
+        }
+
+        /**
+         * Build the EndpointReadyFutures object.
+         *
+         * @param authorizer    The authorizer to use, if any. Will be started.
+         * @param info          Server information to be passed to the 
authorizer.
+         *
+         * @return              The new futures object.
+         */
+        public EndpointReadyFutures build(
+            Optional<Authorizer> authorizer,
+            AuthorizerServerInfo info
+        ) {
+            if (authorizer.isPresent()) {
+                return build(authorizer.get().start(info), info);
+            } else {
+                return build(Collections.emptyMap(), info);
+            }
+        }
+
+        EndpointReadyFutures build(
+            Map<Endpoint, ? extends CompletionStage<?>> authorizerStartFutures,
+            AuthorizerServerInfo info
+        ) {
+            if (logContext == null) logContext = new LogContext();
+            Map<Endpoint, CompletionStage<?>> effectiveStartFutures =
+                    new HashMap<>(authorizerStartFutures);
+            for (Endpoint endpoint : info.endpoints()) {
+                if (!effectiveStartFutures.containsKey(endpoint)) {
+                    CompletableFuture<Void> completedFuture = 
CompletableFuture.completedFuture(null);
+                    effectiveStartFutures.put(endpoint, completedFuture);
+                }
+            }
+            if (info.endpoints().size() != effectiveStartFutures.size()) {
+                List<String> notInInfo = new ArrayList<>();
+                for (Endpoint endpoint : effectiveStartFutures.keySet()) {
+                    if (!info.endpoints().contains(endpoint)) {
+                        
notInInfo.add(endpoint.listenerName().orElse("[none]"));
+                    }
+                }
+                throw new RuntimeException("Found authorizer futures that 
weren't included " +
+                        "in AuthorizerServerInfo: " + notInInfo);
+            }
+            addReadinessFutures("authorizerStart", effectiveStartFutures);
+            stages.forEach(stage -> {
+                Map<Endpoint, CompletionStage<?>> newReadinessFutures = new 
HashMap<>();
+                info.endpoints().forEach(endpoint -> {
+                    newReadinessFutures.put(endpoint, stage.future);
+                });
+                addReadinessFutures(stage.name, newReadinessFutures);
+            });
+            return new EndpointReadyFutures(logContext,
+                    endpointStages);
+        }
+    }
+
+    static class EndpointCompletionStage {
+        final String name;
+        final CompletionStage<?> future;
+
+        EndpointCompletionStage(String name, CompletionStage<?> future) {
+            this.name = name;
+            this.future = future;
+        }
+    }
+
+    class EndpointReadyFuture {
+        final String endpointName;
+        final TreeSet<String> incomplete;
+        final CompletableFuture<Void> future;
+
+        EndpointReadyFuture(Endpoint endpoint, Collection<String> stageNames) {
+            this.endpointName = endpoint.listenerName().orElse("UNNAMED");
+            this.incomplete = new TreeSet<>(stageNames);
+            this.future = new CompletableFuture<>();
+        }
+
+        void completeStage(String stageName) {
+            boolean done = false;
+            synchronized (EndpointReadyFuture.this) {
+                if (incomplete.remove(stageName)) {
+                    if (incomplete.isEmpty()) {
+                        done = true;
+                    } else {
+                        log.info("{} completed for endpoint {}. Still waiting 
for {}.",
+                                stageName, endpointName, incomplete);
+                    }
+                }
+            }
+            if (done) {
+                if (future.complete(null)) {
+                    log.info("{} completed for endpoint {}. Endpoint is now 
READY.",
+                            stageName, endpointName);
+                }
+            }
+        }
+
+        void failStage(String what, Throwable exception) {
+            if (future.completeExceptionally(exception)) {
+                synchronized (EndpointReadyFuture.this) {
+                    incomplete.clear();
+                }
+                log.warn("Endpoint {} will never become ready because we 
encountered an {} exception",
+                        endpointName, what, exception);
+            }
+        }
+    }
+
+    private final Logger log;
+
+    private final Map<Endpoint, CompletableFuture<Void>> futures;
+
+    private EndpointReadyFutures(
+        LogContext logContext,
+        Map<Endpoint, List<EndpointCompletionStage>> endpointStages
+    ) {
+        this.log = logContext.logger(EndpointReadyFutures.class);
+        Map<Endpoint, CompletableFuture<Void>> newFutures = new HashMap<>();
+        endpointStages.forEach((endpoint, stages) -> {
+            List<String> stageNames = new ArrayList<>();
+            stages.forEach(stage -> stageNames.add(stage.name));
+            EndpointReadyFuture readyFuture = new 
EndpointReadyFuture(endpoint, stageNames);
+            newFutures.put(endpoint, readyFuture.future);
+            stages.forEach(stage -> {
+                stage.future.whenComplete((__, exception) -> {
+                    if (exception != null) {
+                        readyFuture.failStage(stage.name, exception);
+                    } else {
+                        readyFuture.completeStage(stage.name);
+                    }
+                });
+            });
+        });
+        this.futures = Collections.unmodifiableMap(newFutures);
+    }
+
+    public Map<Endpoint, CompletableFuture<Void>> futures() {
+        return futures;
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java
 
b/server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java
new file mode 100644
index 00000000000..1c81379e8e2
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.network;
+
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+
+/**
+ * Runtime broker configuration metadata provided to authorizers during start 
up.
+ */
+public final class KafkaAuthorizerServerInfo implements AuthorizerServerInfo {
+    private final ClusterResource clusterResource;
+    private final int brokerId;
+    private final Collection<Endpoint> endpoints;
+    private final Endpoint interbrokerEndpoint;
+    private final Collection<String> earlyStartListeners;
+
+    public KafkaAuthorizerServerInfo(
+        ClusterResource clusterResource,
+        int brokerId,
+        Collection<Endpoint> endpoints,
+        Endpoint interbrokerEndpoint,
+        Collection<String> earlyStartListeners
+    ) {
+        this.clusterResource = clusterResource;
+        this.brokerId = brokerId;
+        this.endpoints = Collections.unmodifiableCollection(new 
ArrayList<>(endpoints));
+        this.interbrokerEndpoint = interbrokerEndpoint;
+        this.earlyStartListeners = Collections.unmodifiableCollection(new 
ArrayList<>(earlyStartListeners));
+    }
+
+    @Override
+    public ClusterResource clusterResource() {
+        return clusterResource;
+    }
+
+    @Override
+    public int brokerId() {
+        return brokerId;
+    }
+
+    @Override
+    public Collection<Endpoint> endpoints() {
+        return endpoints;
+    }
+
+    @Override
+    public Endpoint interBrokerEndpoint() {
+        return interbrokerEndpoint;
+    }
+
+    @Override
+    public Collection<String> earlyStartListeners() {
+        return earlyStartListeners;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || 
(!(o.getClass().equals(KafkaAuthorizerServerInfo.class)))) return false;
+        KafkaAuthorizerServerInfo other = (KafkaAuthorizerServerInfo) o;
+        return clusterResource.equals(other.clusterResource) &&
+                brokerId == other.brokerId &&
+                endpoints.equals(other.endpoints) &&
+                interbrokerEndpoint.equals(other.interbrokerEndpoint) &&
+                earlyStartListeners.equals(other.earlyStartListeners);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clusterResource,
+                brokerId,
+                endpoints,
+                interbrokerEndpoint,
+                earlyStartListeners);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaAuthorizerServerInfo(" +
+                "clusterResource=" + clusterResource +
+                ", brokerId=" + brokerId +
+                ", endpoints=" + endpoints +
+                ", earlyStartListeners=" + earlyStartListeners +
+                ")";
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java 
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
new file mode 100644
index 00000000000..c3d8e0f0319
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class FeaturesTest {
+    @Test
+    public void testKRaftModeFeatures() {
+        Features features = new Features(MINIMUM_KRAFT_VERSION,
+                Collections.singletonMap("foo", (short) 2), 123, true);
+        assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
+                features.finalizedFeatures().get(FEATURE_NAME));
+        assertEquals((short) 2,
+                features.finalizedFeatures().get("foo"));
+        assertEquals(2, features.finalizedFeatures().size());
+    }
+
+    @Test
+    public void testZkModeFeatures() {
+        Features features = new Features(MINIMUM_KRAFT_VERSION,
+                Collections.singletonMap("foo", (short) 2), 123, false);
+        assertNull(features.finalizedFeatures().get(FEATURE_NAME));
+        assertEquals((short) 2,
+                features.finalizedFeatures().get("foo"));
+        assertEquals(1, features.finalizedFeatures().size());
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
new file mode 100644
index 00000000000..2d76e5df37d
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.network;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final public class EndpointReadyFuturesTest {
+    private static final Endpoint EXTERNAL =
+            new Endpoint("EXTERNAL", SecurityProtocol.SSL, "127.0.0.1", 9092);
+
+    private static final Endpoint INTERNAL =
+            new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "127.0.0.1", 
9093);
+
+    private static final KafkaAuthorizerServerInfo INFO = new 
KafkaAuthorizerServerInfo(
+        new ClusterResource("S6-01LPiQOCBhhFIunQUcQ"),
+        1,
+        Arrays.asList(EXTERNAL, INTERNAL),
+        INTERNAL,
+        Arrays.asList("INTERNAL"));
+
+    static void assertComplete(
+            EndpointReadyFutures readyFutures,
+            Endpoint... endpoints
+    ) {
+        for (Endpoint endpoint : endpoints) {
+            String name = endpoint.listenerName().get();
+            CompletableFuture<Void> future = 
readyFutures.futures().get(endpoint);
+            assertNotNull(future, "Unable to find future for " + name);
+            assertTrue(future.isDone(), "Future for " + name + " is not 
done.");
+            assertFalse(future.isCompletedExceptionally(),
+                    "Future for " + name + " is completed exceptionally.");
+        }
+    }
+
+    static void assertIncomplete(
+            EndpointReadyFutures readyFutures,
+            Endpoint... endpoints
+    ) {
+        for (Endpoint endpoint : endpoints) {
+            CompletableFuture<Void> future = 
readyFutures.futures().get(endpoint);
+            assertNotNull(future, "Unable to find future for " + endpoint);
+            assertFalse(future.isDone(), "Future for " + endpoint + " is 
done.");
+        }
+    }
+
+    static void assertException(
+            EndpointReadyFutures readyFutures,
+            Throwable throwable,
+            Endpoint... endpoints
+    ) {
+        for (Endpoint endpoint : endpoints) {
+            CompletableFuture<Void> future = 
readyFutures.futures().get(endpoint);
+            assertNotNull(future, "Unable to find future for " + endpoint);
+            assertTrue(future.isCompletedExceptionally(),
+                    "Future for " + endpoint + " is not completed 
exceptionally.");
+            Throwable cause = assertThrows(CompletionException.class,
+                () -> future.getNow(null)).getCause();
+            assertNotNull(cause, "Unable to find CompletionException cause for 
" + endpoint);
+            assertEquals(throwable.getClass(), cause.getClass());
+            assertEquals(throwable.getMessage(), cause.getMessage());
+        }
+    }
+
+    @Test
+    public void testImmediateCompletion() {
+        EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
+                build(Optional.empty(), INFO);
+        assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
+                readyFutures.futures().keySet());
+        assertComplete(readyFutures, EXTERNAL, INTERNAL);
+    }
+
+    @Test
+    public void testAddReadinessFuture() {
+        CompletableFuture<Void> foo = new CompletableFuture<>();
+        EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
+                addReadinessFuture("foo", foo).
+                build(Optional.empty(), INFO);
+        assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
+                readyFutures.futures().keySet());
+        assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
+        foo.complete(null);
+        assertComplete(readyFutures, EXTERNAL, INTERNAL);
+    }
+
+    @Test
+    public void testAddMultipleReadinessFutures() {
+        CompletableFuture<Void> foo = new CompletableFuture<>();
+        CompletableFuture<Void> bar = new CompletableFuture<>();
+        EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
+                addReadinessFuture("foo", foo).
+                addReadinessFuture("bar", bar).
+                build(Optional.empty(), INFO);
+        assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
+                readyFutures.futures().keySet());
+        assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
+        foo.complete(null);
+        assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
+        bar.complete(null);
+        assertComplete(readyFutures, EXTERNAL, INTERNAL);
+    }
+
+    @Test
+    public void testAddReadinessFutures() {
+        Map<Endpoint, CompletableFuture<Void>> bazFutures = new HashMap<>();
+        bazFutures.put(EXTERNAL, new CompletableFuture<>());
+        bazFutures.put(INTERNAL, new CompletableFuture<>());
+        EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
+                addReadinessFutures("baz", bazFutures).
+                build(Optional.empty(), INFO);
+        assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
+                readyFutures.futures().keySet());
+        assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
+        bazFutures.get(EXTERNAL).complete(null);
+        assertComplete(readyFutures, EXTERNAL);
+        assertIncomplete(readyFutures, INTERNAL);
+        bazFutures.get(INTERNAL).complete(null);
+        assertComplete(readyFutures, EXTERNAL, INTERNAL);
+    }
+
+    @Test
+    public void testFailedReadinessFuture() {
+        CompletableFuture<Void> foo = new CompletableFuture<>();
+        CompletableFuture<Void> bar = new CompletableFuture<>();
+        EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
+                addReadinessFuture("foo", foo).
+                addReadinessFuture("bar", bar).
+                build(Optional.empty(), INFO);
+        assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
+                readyFutures.futures().keySet());
+        assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
+        foo.complete(null);
+        assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
+        bar.completeExceptionally(new RuntimeException("Failed."));
+        assertException(readyFutures, new RuntimeException("Failed."),
+                EXTERNAL, INTERNAL);
+    }
+}

Reply via email to