This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 10a0905628c KAFKA-17564 Move BrokerFeatures to server module (#17228)
10a0905628c is described below

commit 10a0905628c65c1fa8df2d0c1c7d1a808074cf08
Author: Ken Huang <[email protected]>
AuthorDate: Mon Oct 7 15:16:48 2024 +0800

    KAFKA-17564 Move BrokerFeatures to server module (#17228)
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../scala/kafka/controller/KafkaController.scala   |   7 +-
 .../scala/kafka/server/ApiVersionManager.scala     |   2 +-
 .../main/scala/kafka/server/BrokerFeatures.scala   | 155 ---------------------
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   4 +-
 .../main/scala/kafka/server/MetadataCache.scala    |   1 +
 .../kafka/server/metadata/ZkMetadataCache.scala    |   7 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  |   1 +
 .../unit/kafka/server/BrokerFeaturesTest.scala     | 117 ----------------
 .../kafka/server/BrokerLifecycleManagerTest.scala  |   3 +-
 .../kafka/server/FinalizedFeatureCacheTest.scala   |  13 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   2 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   2 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   1 +
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |   2 +-
 .../org/apache/kafka/server/BrokerFeatures.java    | 155 +++++++++++++++++++++
 .../apache/kafka/server/BrokerFeaturesTest.java    | 130 +++++++++++++++++
 20 files changed, 314 insertions(+), 298 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0f9d65ebb1c..5d886a30401 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -43,6 +43,7 @@ import 
org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, Leade
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
 import org.apache.kafka.metadata.migration.ZkMigrationState
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.{AdminOperationException, 
ProducerIdsBlock}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.KafkaScheduler
@@ -431,7 +432,7 @@ class KafkaController(val config: KafkaConfig,
       val newVersion = createFeatureZNode(
         FeatureZNode(config.interBrokerProtocolVersion,
           FeatureZNodeStatus.Enabled,
-          brokerFeatures.defaultFinalizedFeatures
+          brokerFeatures.defaultFinalizedFeatures.asScala.map { case (k, v) => 
(k, v.shortValue()) }
         ))
       featureCache.waitUntilFeatureEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
     } else {
@@ -1601,7 +1602,7 @@ class KafkaController(val config: KafkaConfig,
           latestFinalizedFeatures =>
             BrokerFeatures.hasIncompatibleFeatures(broker.features,
               latestFinalizedFeatures.finalizedFeatures().asScala.
-                map(kv => (kv._1, kv._2.toShort)).toMap))
+                map(kv => (kv._1, kv._2.toShort: 
java.lang.Short)).toMap.asJava))
     }
   }
 
@@ -1983,7 +1984,7 @@ class KafkaController(val config: KafkaConfig,
           s" versionLevel:${update.versionLevel} is lower than the" +
           s" supported minVersion:${supportedVersionRange.min}."))
       } else {
-        val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature, 
newVersion)).asScala.toMap
+        val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature, 
newVersion: java.lang.Short))
         val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
           BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
         })
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala 
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index aa8fc4a35d6..588fe99aea1 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -21,7 +21,7 @@ import 
org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.network.metrics.RequestChannelMetrics
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
 import org.apache.kafka.server.common.FinalizedFeatures
 
 import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala 
b/core/src/main/scala/kafka/server/BrokerFeatures.scala
deleted file mode 100644
index 6a8ddcb19c7..00000000000
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.Logging
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
-import org.apache.kafka.metadata.VersionRange
-import org.apache.kafka.server.common.Features.PRODUCTION_FEATURES
-import org.apache.kafka.server.common.MetadataVersion
-
-import java.util
-import scala.jdk.CollectionConverters._
-
-/**
- * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
- * check for incompatibilities between the features supported by the Broker 
and finalized features.
- * This class is immutable in production. It provides few APIs to mutate state 
only for the purpose
- * of testing.
- */
-class BrokerFeatures private (@volatile var supportedFeatures: 
Features[SupportedVersionRange]) {
-  // For testing only.
-  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {
-    val combined = new util.HashMap[String, 
SupportedVersionRange](supportedFeatures.features())
-    combined.putAll(newFeatures.features())
-    supportedFeatures = Features.supportedFeatures(combined)
-  }
-
-  /**
-   * Returns the default finalized features that a new Kafka cluster with IBP 
config >= IBP_2_7_IV0
-   * needs to be bootstrapped with.
-   */
-  def defaultFinalizedFeatures: Map[String, Short] = {
-    supportedFeatures.features.asScala.map {
-      case(name, versionRange) =>
-        if (name.equals("kraft.version")) {
-          (name, 0.toShort)
-        } else {
-          (name, versionRange.max)
-        }
-    }.toMap
-  }
-
-  /**
-   * Returns the set of feature names found to be incompatible.
-   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
-   * Broker, and a provided finalized feature. This can happen because a 
provided finalized
-   * feature:
-   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
-   *           [OR]
-   *  2) Exists but the FinalizedVersionRange does not match with the 
SupportedVersionRange
-   *     of the supported feature.
-   *
-   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
-   *
-   * @return            The subset of input features which are incompatible. 
If the returned object
-   *                    is empty, it means there were no feature 
incompatibilities found.
-   */
-  def incompatibleFeatures(finalized: Map[String, Short]): Map[String, Short] 
= {
-    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, 
logIncompatibilities = true)
-  }
-}
-
-object BrokerFeatures extends Logging {
-
-  def createDefault(unstableFeatureVersionsEnabled: Boolean): BrokerFeatures = 
{
-    new 
BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled))
-  }
-
-  def createDefaultFeatureMap(features: BrokerFeatures): Map[String, 
VersionRange] = {
-    features.supportedFeatures.features.asScala.map {
-      case (name, versionRange) =>
-        (name, VersionRange.of(versionRange.min, versionRange.max))
-    }.toMap
-  }
-
-  def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-    val features = new util.HashMap[String, SupportedVersionRange]()
-      features.put(MetadataVersion.FEATURE_NAME,
-        new SupportedVersionRange(
-          MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
-          if (unstableFeatureVersionsEnabled) {
-            MetadataVersion.latestTesting.featureLevel
-          } else {
-            MetadataVersion.latestProduction.featureLevel
-          }))
-    PRODUCTION_FEATURES.forEach {
-      feature =>
-        val maxVersion = if (unstableFeatureVersionsEnabled)
-          feature.latestTesting
-        else
-          feature.latestProduction
-        if (maxVersion > 0) {
-          features.put(feature.featureName, new 
SupportedVersionRange(feature.minimumProduction(), maxVersion))
-        }
-    }
-    Features.supportedFeatures(features)
-  }
-
-  def createEmpty(): BrokerFeatures = {
-    new BrokerFeatures(Features.emptySupportedFeatures())
-  }
-
-  /**
-   * Returns true if any of the provided finalized features are incompatible 
with the provided
-   * supported features.
-   *
-   * @param supportedFeatures   The supported features to be compared
-   * @param finalizedFeatures   The finalized features to be compared
-   *
-   * @return                    - True if there are any feature 
incompatibilities found.
-   *                            - False otherwise.
-   */
-  def hasIncompatibleFeatures(supportedFeatures: 
Features[SupportedVersionRange],
-                              finalizedFeatures: Map[String, Short]): Boolean 
= {
-    incompatibleFeatures(supportedFeatures, finalizedFeatures, 
logIncompatibilities = false).nonEmpty
-  }
-
-  private def incompatibleFeatures(supportedFeatures: 
Features[SupportedVersionRange],
-                                   finalizedFeatures: Map[String, Short],
-                                   logIncompatibilities: Boolean): Map[String, 
Short] = {
-    val incompatibleFeaturesInfo = finalizedFeatures.map {
-      case (feature, versionLevels) =>
-        val supportedVersions = supportedFeatures.get(feature)
-        if (supportedVersions == null) {
-          (feature, versionLevels, "{feature=%s, reason='Unsupported 
feature'}".format(feature))
-        } else if (supportedVersions.isIncompatibleWith(versionLevels)) {
-          (feature, versionLevels, "{feature=%s, reason='%s is incompatible 
with %s'}".format(
-            feature, versionLevels, supportedVersions))
-        } else {
-          (feature, versionLevels, null)
-        }
-    }.filter{ case(_, _, errorReason) => errorReason != null}.toList
-
-    if (logIncompatibilities && incompatibleFeaturesInfo.nonEmpty) {
-      warn("Feature incompatibilities seen: " +
-           incompatibleFeaturesInfo.map { case(_, _, errorReason) => 
errorReason }.mkString(", "))
-    }
-    incompatibleFeaturesInfo.map { case(feature, versionLevels, _) => 
(feature, versionLevels) }.toMap
-  }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0d88a50bec4..2d2ca1fc384 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRec
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
 import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, 
NodeToControllerChannelManager}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, NodeToControllerChannelManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, TopicIdPartition}
 import org.apache.kafka.server.config.ConfigType
@@ -378,7 +378,7 @@ class BrokerServer(
         ConfigType.CLIENT_METRICS -> new 
ClientMetricsConfigHandler(clientMetricsManager),
         ConfigType.GROUP -> new GroupConfigHandler(groupCoordinator))
 
-      val featuresRemapped = 
BrokerFeatures.createDefaultFeatureMap(brokerFeatures).asJava
+      val featuresRemapped = 
BrokerFeatures.createDefaultFeatureMap(brokerFeatures)
 
       val brokerLifecycleChannelManager = new 
NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index af0534237a4..bbac5a5ed30 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -54,7 +54,7 @@ import org.apache.kafka.metadata.{BrokerState, 
MetadataRecordSerde, VersionRange
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.raft.Endpoints
 import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.NodeToControllerChannelManager
+import org.apache.kafka.server.{BrokerFeatures, NodeToControllerChannelManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@@ -472,7 +472,7 @@ class KafkaServer(
               setSecurityProtocol(ep.securityProtocol.id))
           }
 
-          val features = 
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled))
+          val features = 
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)).asScala
 
           // Even though ZK brokers don't use "metadata.version" feature, we 
need to overwrite it with our IBP as part of registration
           // so the KRaft controller can verify that all brokers are on the 
same IBP before starting the migration.
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 68afc0b3d35..4b14f04483e 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
 
 import java.util
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index ca41ebda7f3..3205a24aa44 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -24,7 +24,7 @@ import scala.collection.{Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.controller.StateChangeLogger
-import kafka.server.{BrokerFeatures, CachedControllerId, 
KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
+import kafka.server.{CachedControllerId, KRaftCachedControllerId, 
MetadataCache, ZkCachedControllerId}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.admin.BrokerMetadata
@@ -39,6 +39,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractControlRequest, 
ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
 
 import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
@@ -649,8 +650,8 @@ class ZkMetadataCache(
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
       val incompatibleFeatures = brokerFeatures.incompatibleFeatures(
-        latest.finalizedFeatures().asScala.map(kv => (kv._1, 
kv._2.toShort)).toMap)
-      if (incompatibleFeatures.nonEmpty) {
+        latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort: 
java.lang.Short)).toMap.asJava)
+      if (!incompatibleFeatures.isEmpty) {
         val errorMsg = "FinalizedFeatureCache update failed since feature 
compatibility" +
           s" checks failed! Supported ${brokerFeatures.supportedFeatures} has 
incompatibilities" +
           s" with the latest $latest."
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index ae0dcb94a4f..a7a18085bbb 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -20,6 +20,7 @@ import kafka.server.metadata.ZkMetadataCache
 import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.{Disabled, Test}
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
deleted file mode 100644
index 0bcac054cf4..00000000000
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
-import org.apache.kafka.server.common.{MetadataVersion, Features => 
ServerFeatures}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertTrue}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-import scala.jdk.CollectionConverters._
-
-class BrokerFeaturesTest {
-
-  @Test
-  def testEmpty(): Unit = {
-    assertTrue(BrokerFeatures.createEmpty().supportedFeatures.empty)
-  }
-
-  @Test
-  def testIncompatibilitiesDueToAbsentFeature(): Unit = {
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    val supportedFeatures = Features.supportedFeatures(Map[String, 
SupportedVersionRange](
-      "test_feature_1" -> new SupportedVersionRange(1, 4),
-      "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
-    brokerFeatures.setSupportedFeatures(supportedFeatures)
-
-    val compatibleFeatures = Map[String, Short]("test_feature_1" -> 4)
-    val inCompatibleFeatures = Map[String, Short]("test_feature_3" -> 4)
-    val features = compatibleFeatures++inCompatibleFeatures
-    val finalizedFeatures = features
-
-    assertEquals(inCompatibleFeatures,
-      brokerFeatures.incompatibleFeatures(finalizedFeatures))
-    assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, 
finalizedFeatures))
-  }
-
-  @Test
-  def testIncompatibilitiesDueToIncompatibleFeature(): Unit = {
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    val supportedFeatures = Features.supportedFeatures(Map[String, 
SupportedVersionRange](
-      "test_feature_1" -> new SupportedVersionRange(1, 4),
-      "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
-    brokerFeatures.setSupportedFeatures(supportedFeatures)
-
-    val compatibleFeatures = Map[String, Short]("test_feature_1" -> 3)
-    val inCompatibleFeatures = Map[String, Short]("test_feature_2" -> 4)
-    val features = compatibleFeatures++inCompatibleFeatures
-    val finalizedFeatures = features
-
-    assertEquals(
-      inCompatibleFeatures,
-      brokerFeatures.incompatibleFeatures(finalizedFeatures))
-    assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, 
finalizedFeatures))
-  }
-
-  @Test
-  def testCompatibleFeatures(): Unit = {
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    val supportedFeatures = Features.supportedFeatures(Map[String, 
SupportedVersionRange](
-      "test_feature_1" -> new SupportedVersionRange(1, 4),
-      "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
-    brokerFeatures.setSupportedFeatures(supportedFeatures)
-
-    val compatibleFeatures = Map[String, Short](
-      "test_feature_1" -> 3,
-      "test_feature_2" -> 3)
-    val finalizedFeatures = compatibleFeatures
-    assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty)
-    assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, 
finalizedFeatures))
-  }
-
-  @Test
-  def testDefaultFinalizedFeatures(): Unit = {
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    val supportedFeatures = Features.supportedFeatures(Map[String, 
SupportedVersionRange](
-      "test_feature_1" -> new SupportedVersionRange(1, 4),
-      "test_feature_2" -> new SupportedVersionRange(1, 3),
-      "test_feature_3" -> new SupportedVersionRange(3, 7)).asJava)
-    brokerFeatures.setSupportedFeatures(supportedFeatures)
-
-    val expectedFeatures = Map[String, Short](
-      MetadataVersion.FEATURE_NAME -> 
MetadataVersion.latestTesting().featureLevel(),
-      ServerFeatures.TRANSACTION_VERSION.featureName() -> 
ServerFeatures.TRANSACTION_VERSION.latestTesting(),
-      ServerFeatures.GROUP_VERSION.featureName() -> 
ServerFeatures.GROUP_VERSION.latestTesting(),
-      "kraft.version" -> 0,
-      "test_feature_1" -> 4,
-      "test_feature_2" -> 3,
-      "test_feature_3" -> 7)
-    assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures)
-  }
-
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def ensureDefaultSupportedFeaturesRangeMaxNotZero(unstableVersionsEnabled: 
Boolean): Unit = {
-    val brokerFeatures = BrokerFeatures.createDefault(unstableVersionsEnabled)
-    brokerFeatures.supportedFeatures.features().values().forEach { 
supportedVersionRange =>
-      assertNotEquals(0, supportedVersionRange.max())
-    }
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 8e88eea3c46..0f5d3f027e5 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, 
BrokerRegistrationResponse}
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0, 
IBP_3_9_IV0}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs, ZkConfigs}
@@ -120,7 +121,7 @@ class BrokerLifecycleManagerTest {
     manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
-    val features = 
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true))
+    val features = 
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asScala
 
     // Even though ZK brokers don't use "metadata.version" feature, we need to 
overwrite it with our IBP as part of registration
     // so the KRaft controller can verify that all brokers are on the same IBP 
before starting the migration.
diff --git 
a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
index 98f6fa3bc8e..d98bcd23b25 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache}
 import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.Test
@@ -40,8 +41,7 @@ class FinalizedFeatureCacheTest {
   def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
     val supportedFeatures = Map[String, SupportedVersionRange](
       "feature_1" -> new SupportedVersionRange(1, 4))
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
 
     val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
 
@@ -63,8 +63,7 @@ class FinalizedFeatureCacheTest {
   def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = {
     val supportedFeatures =
       Map[String, SupportedVersionRange]("feature_1" -> new 
SupportedVersionRange(1, 1))
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
 
     val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
 
@@ -79,8 +78,7 @@ class FinalizedFeatureCacheTest {
   def testUpdateOrThrowSuccess(): Unit = {
     val supportedFeatures =
       Map[String, SupportedVersionRange]("feature_1" -> new 
SupportedVersionRange(1, 4))
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
 
     val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
 
@@ -95,8 +93,7 @@ class FinalizedFeatureCacheTest {
   def testClear(): Unit = {
     val supportedFeatures =
       Map[String, SupportedVersionRange]("feature_1" -> new 
SupportedVersionRange(1, 4))
-    val brokerFeatures = BrokerFeatures.createDefault(true)
-    
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
 
     val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2674a314238..d9451804e4d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -82,7 +82,7 @@ import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, 
IBP_2_2_IV1}
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 30fdb666185..e27f224931c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.common
+import org.apache.kafka.server.{BrokerFeatures, common}
 import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 661a638aa88..c5756ec61f6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType, 
MemoryRecords, RecordBat
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 2f8f762145f..242d65556eb 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -24,7 +24,6 @@ import kafka.cluster.Partition;
 import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerBlockingSender;
-import kafka.server.BrokerFeatures;
 import kafka.server.FailedPartitions;
 import kafka.server.InitialFetchState;
 import kafka.server.KafkaConfig;
@@ -63,6 +62,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.BrokerFeatures;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.util.KafkaScheduler;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 0f8d2db4e2f..9a5fadd828e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -21,7 +21,6 @@ import kafka.controller.KafkaController;
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
 import kafka.server.AutoTopicCreationManager;
-import kafka.server.BrokerFeatures;
 import kafka.server.ClientQuotaManager;
 import kafka.server.ClientRequestQuotaManager;
 import kafka.server.ControllerMutationQuotaManager;
@@ -60,6 +59,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
+import org.apache.kafka.server.BrokerFeatures;
 import org.apache.kafka.server.common.FinalizedFeatures;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ServerConfigs;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 1edbca20eaa..5de1fd9a459 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.server;
 import kafka.cluster.Partition;
 import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
-import kafka.server.BrokerFeatures;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory;
@@ -32,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.BrokerFeatures;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.util.KafkaScheduler;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index c2411124bb7..f572c43250c 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.server;
 import kafka.cluster.Partition;
 import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
-import kafka.server.BrokerFeatures;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory;
@@ -37,6 +36,7 @@ import 
org.apache.kafka.common.message.LeaderAndIsrRequestData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.BrokerFeatures;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.Scheduler;
diff --git a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java 
b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
new file mode 100644
index 00000000000..95777effe5e
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.common.Features.PRODUCTION_FEATURES;
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * This class is immutable in production. It provides few APIs to mutate state 
only for the purpose
+ * of testing.
+ */
+public class BrokerFeatures {
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+    private static final Logger log = 
LoggerFactory.getLogger(BrokerFeatures.class);
+
+    private BrokerFeatures(Features<SupportedVersionRange> supportedFeatures) {
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    public static BrokerFeatures createDefault(boolean 
unstableFeatureVersionsEnabled) {
+        return new 
BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled));
+    }
+    
+    // only for testing
+    public static BrokerFeatures createDefault(boolean 
unstableFeatureVersionsEnabled, Features<SupportedVersionRange> newFeatures) {
+        Map<String, SupportedVersionRange> combined = new 
HashMap<>(defaultSupportedFeatures(unstableFeatureVersionsEnabled).features());
+        combined.putAll(newFeatures.features());
+        return new BrokerFeatures(Features.supportedFeatures(combined));
+    }
+
+    public static Map<String, VersionRange> 
createDefaultFeatureMap(BrokerFeatures features) {
+        return features.supportedFeatures.features()
+                .entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
VersionRange.of(e.getValue().min(), e.getValue().max())));
+    }
+
+    public static Features<SupportedVersionRange> 
defaultSupportedFeatures(boolean unstableFeatureVersionsEnabled) {
+        Map<String, SupportedVersionRange> features = new HashMap<>();
+        features.put(MetadataVersion.FEATURE_NAME,
+                new SupportedVersionRange(
+                        MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
+                        unstableFeatureVersionsEnabled ? 
MetadataVersion.latestTesting().featureLevel()
+                                : 
MetadataVersion.latestProduction().featureLevel()));
+        PRODUCTION_FEATURES.forEach(feature -> {
+            int maxVersion = unstableFeatureVersionsEnabled ? 
feature.latestTesting() : feature.latestProduction();
+            if (maxVersion > 0) {
+                features.put(feature.featureName(), new 
SupportedVersionRange(feature.minimumProduction(), (short) maxVersion));
+            }
+        });
+        return Features.supportedFeatures(features);
+    }
+
+    public static BrokerFeatures createEmpty() {
+        return new BrokerFeatures(Features.emptySupportedFeatures());
+    }
+
+    /**
+     * Returns true if any of the provided finalized features are incompatible 
with the provided
+     * supported features.
+     *
+     * @param supportedFeatures The supported features to be compared
+     * @param finalizedFeatures The finalized features to be compared
+     * @return - True if there are any feature incompatibilities found.
+     * - False otherwise.
+     */
+    public static boolean 
hasIncompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
+                                                  Map<String, Short> 
finalizedFeatures) {
+        return !incompatibleFeatures(supportedFeatures, finalizedFeatures, 
false).isEmpty();
+    }
+
+    /**
+     * Returns the default finalized features that a new Kafka cluster with 
IBP config >= IBP_2_7_IV0
+     * needs to be bootstrapped with.
+     */
+    public Map<String, Short> defaultFinalizedFeatures() {
+        return supportedFeatures.features().entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> e.getKey().equals(KRaftVersion.FEATURE_NAME) ? 0 
: e.getValue().max()));
+    }
+
+    /**
+     * Returns the set of feature names found to be incompatible.
+     * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+     * Broker, and a provided finalized feature. This can happen because a 
provided finalized
+     * feature:
+     * 1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+     * [OR]
+     * 2) Exists but the FinalizedVersionRange does not match with the 
SupportedVersionRange
+     * of the supported feature.
+     *
+     * @param finalized The finalized features against which incompatibilities 
need to be checked for.
+     * @return The subset of input features which are incompatible. If the 
returned object
+     * is empty, it means there were no feature incompatibilities found.
+     */
+    public Map<String, Short> incompatibleFeatures(Map<String, Short> 
finalized) {
+        return BrokerFeatures.incompatibleFeatures(supportedFeatures, 
finalized, true);
+    }
+
+    public Features<SupportedVersionRange> supportedFeatures() {
+        return supportedFeatures;
+    }
+
+    private static Map<String, Short> 
incompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
+                                                           Map<String, Short> 
finalizedFeatures,
+                                                           boolean 
logIncompatibilities) {
+        Map<String, Short> incompatibleFeaturesInfo = new HashMap<>();
+        finalizedFeatures.forEach((feature, versionLevels) -> {
+            SupportedVersionRange supportedVersions = 
supportedFeatures.get(feature);
+            if (supportedVersions == null) {
+                incompatibleFeaturesInfo.put(feature, versionLevels);
+                if (logIncompatibilities) {
+                    log.warn("Feature incompatibilities seen: {feature={}, 
reason='Unknown feature'}", feature);
+                }
+            } else if (supportedVersions.isIncompatibleWith(versionLevels)) {
+                incompatibleFeaturesInfo.put(feature, versionLevels);
+                if (logIncompatibilities) {
+                    log.warn("Feature incompatibilities seen: {feature={}, 
reason='{} is incompatible with {}'}",
+                            feature, versionLevels, supportedVersions);
+                }
+            }
+        });
+        return incompatibleFeaturesInfo;
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java 
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
new file mode 100644
index 00000000000..6ce2b3a7e65
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.server.common.Features.GROUP_VERSION;
+import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BrokerFeaturesTest {
+
+    @Test
+    public void testEmpty() {
+        assertTrue(BrokerFeatures.createEmpty().supportedFeatures().empty());
+    }
+
+    @Test
+    public void testIncompatibilitiesDueToAbsentFeature() {
+        Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+        newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, 
(short) 4));
+        newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, 
(short) 3));
+        Features<SupportedVersionRange> supportedFeatures = 
Features.supportedFeatures(newFeatures);
+        BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, 
supportedFeatures);
+
+        Map<String, Short> compatibleFeatures = new HashMap<>();
+        compatibleFeatures.put("test_feature_1", (short) 4);
+        Map<String, Short> inCompatibleFeatures = new HashMap<>();
+        inCompatibleFeatures.put("test_feature_2", (short) 4);
+
+        Map<String, Short> finalizedFeatures = new 
HashMap<>(compatibleFeatures);
+        finalizedFeatures.putAll(inCompatibleFeatures);
+
+        assertEquals(inCompatibleFeatures,
+                brokerFeatures.incompatibleFeatures(finalizedFeatures));
+        assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, 
finalizedFeatures));
+    }
+
+    @Test
+    public void testIncompatibilitiesDueToIncompatibleFeature() {
+        Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+        newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, 
(short) 4));
+        newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, 
(short) 3));
+        Features<SupportedVersionRange> supportedFeatures = 
Features.supportedFeatures(newFeatures);
+        BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, 
supportedFeatures);
+
+        Map<String, Short> compatibleFeatures = new HashMap<>();
+        compatibleFeatures.put("test_feature_1", (short) 3);
+        Map<String, Short> inCompatibleFeatures = new HashMap<>();
+        inCompatibleFeatures.put("test_feature_2", (short) 4);
+        Map<String, Short> finalizedFeatures = new 
HashMap<>(compatibleFeatures);
+        finalizedFeatures.putAll(inCompatibleFeatures);
+
+        assertEquals(inCompatibleFeatures, 
brokerFeatures.incompatibleFeatures(finalizedFeatures));
+        assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, 
finalizedFeatures));
+    }
+
+    @Test
+    public void testCompatibleFeatures() {
+        Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+        newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, 
(short) 4));
+        newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, 
(short) 3));
+        Features<SupportedVersionRange> supportedFeatures = 
Features.supportedFeatures(newFeatures);
+        BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, 
supportedFeatures);
+
+        Map<String, Short> compatibleFeatures = new HashMap<>();
+        compatibleFeatures.put("test_feature_1", (short) 3);
+        compatibleFeatures.put("test_feature_2", (short) 3);
+        Map<String, Short> finalizedFeatures = new 
HashMap<>(compatibleFeatures);
+
+        
assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty());
+        assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, 
finalizedFeatures));
+    }
+
+    @Test
+    public void testDefaultFinalizedFeatures() {
+        Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+        newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1, 
(short) 4));
+        newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1, 
(short) 3));
+        newFeatures.put("test_feature_3", new SupportedVersionRange((short) 3, 
(short) 7));
+        Features<SupportedVersionRange> supportedFeatures = 
Features.supportedFeatures(newFeatures);
+        BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true, 
supportedFeatures);
+
+        Map<String, Short> expectedFeatures = new HashMap<>();
+        expectedFeatures.put(MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel());
+        expectedFeatures.put(TRANSACTION_VERSION.featureName(), 
TRANSACTION_VERSION.latestTesting());
+        expectedFeatures.put(GROUP_VERSION.featureName(), 
GROUP_VERSION.latestTesting());
+        expectedFeatures.put("kraft.version", (short) 0);
+        expectedFeatures.put("test_feature_1", (short) 4);
+        expectedFeatures.put("test_feature_2", (short) 3);
+        expectedFeatures.put("test_feature_3", (short) 7);
+
+        assertEquals(expectedFeatures, 
brokerFeatures.defaultFinalizedFeatures());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean 
unstableVersionsEnabled) {
+        BrokerFeatures brokerFeatures = 
BrokerFeatures.createDefault(unstableVersionsEnabled);
+        brokerFeatures.supportedFeatures().features()
+                .values()
+                .forEach(supportedVersionRange -> assertNotEquals(0, 
supportedVersionRange.max()));
+    }
+}

Reply via email to