This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 10a0905628c KAFKA-17564 Move BrokerFeatures to server module (#17228)
10a0905628c is described below
commit 10a0905628c65c1fa8df2d0c1c7d1a808074cf08
Author: Ken Huang <[email protected]>
AuthorDate: Mon Oct 7 15:16:48 2024 +0800
KAFKA-17564 Move BrokerFeatures to server module (#17228)
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../scala/kafka/controller/KafkaController.scala | 7 +-
.../scala/kafka/server/ApiVersionManager.scala | 2 +-
.../main/scala/kafka/server/BrokerFeatures.scala | 155 ---------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-
.../main/scala/kafka/server/MetadataCache.scala | 1 +
.../kafka/server/metadata/ZkMetadataCache.scala | 7 +-
.../unit/kafka/server/ApiVersionManagerTest.scala | 1 +
.../unit/kafka/server/BrokerFeaturesTest.scala | 117 ----------------
.../kafka/server/BrokerLifecycleManagerTest.scala | 3 +-
.../kafka/server/FinalizedFeatureCacheTest.scala | 13 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 2 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 1 +
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../jmh/metadata/MetadataRequestBenchmark.java | 2 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 2 +-
.../kafka/jmh/server/PartitionCreationBench.java | 2 +-
.../org/apache/kafka/server/BrokerFeatures.java | 155 +++++++++++++++++++++
.../apache/kafka/server/BrokerFeaturesTest.java | 130 +++++++++++++++++
20 files changed, 314 insertions(+), 298 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0f9d65ebb1c..5d886a30401 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -43,6 +43,7 @@ import
org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, Leade
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.migration.ZkMigrationState
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{AdminOperationException,
ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
@@ -431,7 +432,7 @@ class KafkaController(val config: KafkaConfig,
val newVersion = createFeatureZNode(
FeatureZNode(config.interBrokerProtocolVersion,
FeatureZNodeStatus.Enabled,
- brokerFeatures.defaultFinalizedFeatures
+ brokerFeatures.defaultFinalizedFeatures.asScala.map { case (k, v) =>
(k, v.shortValue()) }
))
featureCache.waitUntilFeatureEpochOrThrow(newVersion,
config.zkConnectionTimeoutMs)
} else {
@@ -1601,7 +1602,7 @@ class KafkaController(val config: KafkaConfig,
latestFinalizedFeatures =>
BrokerFeatures.hasIncompatibleFeatures(broker.features,
latestFinalizedFeatures.finalizedFeatures().asScala.
- map(kv => (kv._1, kv._2.toShort)).toMap))
+ map(kv => (kv._1, kv._2.toShort:
java.lang.Short)).toMap.asJava))
}
}
@@ -1983,7 +1984,7 @@ class KafkaController(val config: KafkaConfig,
s" versionLevel:${update.versionLevel} is lower than the" +
s" supported minVersion:${supportedVersionRange.min}."))
} else {
- val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature,
newVersion)).asScala.toMap
+ val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature,
newVersion: java.lang.Short))
val numIncompatibleBrokers =
controllerContext.liveOrShuttingDownBrokers.count(broker => {
BrokerFeatures.hasIncompatibleFeatures(broker.features,
newFinalizedFeature)
})
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index aa8fc4a35d6..588fe99aea1 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -21,7 +21,7 @@ import
org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.network.metrics.RequestChannelMetrics
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
import org.apache.kafka.server.common.FinalizedFeatures
import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala
b/core/src/main/scala/kafka/server/BrokerFeatures.scala
deleted file mode 100644
index 6a8ddcb19c7..00000000000
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.Logging
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
-import org.apache.kafka.metadata.VersionRange
-import org.apache.kafka.server.common.Features.PRODUCTION_FEATURES
-import org.apache.kafka.server.common.MetadataVersion
-
-import java.util
-import scala.jdk.CollectionConverters._
-
-/**
- * A class that encapsulates the latest features supported by the Broker and
also provides APIs to
- * check for incompatibilities between the features supported by the Broker
and finalized features.
- * This class is immutable in production. It provides few APIs to mutate state
only for the purpose
- * of testing.
- */
-class BrokerFeatures private (@volatile var supportedFeatures:
Features[SupportedVersionRange]) {
- // For testing only.
- def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit
= {
- val combined = new util.HashMap[String,
SupportedVersionRange](supportedFeatures.features())
- combined.putAll(newFeatures.features())
- supportedFeatures = Features.supportedFeatures(combined)
- }
-
- /**
- * Returns the default finalized features that a new Kafka cluster with IBP
config >= IBP_2_7_IV0
- * needs to be bootstrapped with.
- */
- def defaultFinalizedFeatures: Map[String, Short] = {
- supportedFeatures.features.asScala.map {
- case(name, versionRange) =>
- if (name.equals("kraft.version")) {
- (name, 0.toShort)
- } else {
- (name, versionRange.max)
- }
- }.toMap
- }
-
- /**
- * Returns the set of feature names found to be incompatible.
- * A feature incompatibility is a version mismatch between the latest
feature supported by the
- * Broker, and a provided finalized feature. This can happen because a
provided finalized
- * feature:
- * 1) Does not exist in the Broker (i.e. it is unknown to the Broker).
- * [OR]
- * 2) Exists but the FinalizedVersionRange does not match with the
SupportedVersionRange
- * of the supported feature.
- *
- * @param finalized The finalized features against which incompatibilities
need to be checked for.
- *
- * @return The subset of input features which are incompatible.
If the returned object
- * is empty, it means there were no feature
incompatibilities found.
- */
- def incompatibleFeatures(finalized: Map[String, Short]): Map[String, Short]
= {
- BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized,
logIncompatibilities = true)
- }
-}
-
-object BrokerFeatures extends Logging {
-
- def createDefault(unstableFeatureVersionsEnabled: Boolean): BrokerFeatures =
{
- new
BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled))
- }
-
- def createDefaultFeatureMap(features: BrokerFeatures): Map[String,
VersionRange] = {
- features.supportedFeatures.features.asScala.map {
- case (name, versionRange) =>
- (name, VersionRange.of(versionRange.min, versionRange.max))
- }.toMap
- }
-
- def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean):
Features[SupportedVersionRange] = {
- val features = new util.HashMap[String, SupportedVersionRange]()
- features.put(MetadataVersion.FEATURE_NAME,
- new SupportedVersionRange(
- MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
- if (unstableFeatureVersionsEnabled) {
- MetadataVersion.latestTesting.featureLevel
- } else {
- MetadataVersion.latestProduction.featureLevel
- }))
- PRODUCTION_FEATURES.forEach {
- feature =>
- val maxVersion = if (unstableFeatureVersionsEnabled)
- feature.latestTesting
- else
- feature.latestProduction
- if (maxVersion > 0) {
- features.put(feature.featureName, new
SupportedVersionRange(feature.minimumProduction(), maxVersion))
- }
- }
- Features.supportedFeatures(features)
- }
-
- def createEmpty(): BrokerFeatures = {
- new BrokerFeatures(Features.emptySupportedFeatures())
- }
-
- /**
- * Returns true if any of the provided finalized features are incompatible
with the provided
- * supported features.
- *
- * @param supportedFeatures The supported features to be compared
- * @param finalizedFeatures The finalized features to be compared
- *
- * @return - True if there are any feature
incompatibilities found.
- * - False otherwise.
- */
- def hasIncompatibleFeatures(supportedFeatures:
Features[SupportedVersionRange],
- finalizedFeatures: Map[String, Short]): Boolean
= {
- incompatibleFeatures(supportedFeatures, finalizedFeatures,
logIncompatibilities = false).nonEmpty
- }
-
- private def incompatibleFeatures(supportedFeatures:
Features[SupportedVersionRange],
- finalizedFeatures: Map[String, Short],
- logIncompatibilities: Boolean): Map[String,
Short] = {
- val incompatibleFeaturesInfo = finalizedFeatures.map {
- case (feature, versionLevels) =>
- val supportedVersions = supportedFeatures.get(feature)
- if (supportedVersions == null) {
- (feature, versionLevels, "{feature=%s, reason='Unsupported
feature'}".format(feature))
- } else if (supportedVersions.isIncompatibleWith(versionLevels)) {
- (feature, versionLevels, "{feature=%s, reason='%s is incompatible
with %s'}".format(
- feature, versionLevels, supportedVersions))
- } else {
- (feature, versionLevels, null)
- }
- }.filter{ case(_, _, errorReason) => errorReason != null}.toList
-
- if (logIncompatibilities && incompatibleFeaturesInfo.nonEmpty) {
- warn("Feature incompatibilities seen: " +
- incompatibleFeaturesInfo.map { case(_, _, errorReason) =>
errorReason }.mkString(", "))
- }
- incompatibleFeaturesInfo.map { case(feature, versionLevels, _) =>
(feature, versionLevels) }.toMap
- }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0d88a50bec4..2d2ca1fc384 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRec
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager,
NodeToControllerChannelManager}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.config.ConfigType
@@ -378,7 +378,7 @@ class BrokerServer(
ConfigType.CLIENT_METRICS -> new
ClientMetricsConfigHandler(clientMetricsManager),
ConfigType.GROUP -> new GroupConfigHandler(groupCoordinator))
- val featuresRemapped =
BrokerFeatures.createDefaultFeatureMap(brokerFeatures).asJava
+ val featuresRemapped =
BrokerFeatures.createDefaultFeatureMap(brokerFeatures)
val brokerLifecycleChannelManager = new
NodeToControllerChannelManagerImpl(
controllerNodeProvider,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index af0534237a4..bbac5a5ed30 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -54,7 +54,7 @@ import org.apache.kafka.metadata.{BrokerState,
MetadataRecordSerde, VersionRange
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.NodeToControllerChannelManager
+import org.apache.kafka.server.{BrokerFeatures, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@@ -472,7 +472,7 @@ class KafkaServer(
setSecurityProtocol(ep.securityProtocol.id))
}
- val features =
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled))
+ val features =
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)).asScala
// Even though ZK brokers don't use "metadata.version" feature, we
need to overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the
same IBP before starting the migration.
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 68afc0b3d35..4b14f04483e 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData,
UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import java.util
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index ca41ebda7f3..3205a24aa44 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -24,7 +24,7 @@ import scala.collection.{Seq, Set, mutable}
import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.controller.StateChangeLogger
-import kafka.server.{BrokerFeatures, CachedControllerId,
KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
+import kafka.server.{CachedControllerId, KRaftCachedControllerId,
MetadataCache, ZkCachedControllerId}
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.admin.BrokerMetadata
@@ -39,6 +39,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest,
ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
@@ -649,8 +650,8 @@ class ZkMetadataCache(
throw new FeatureCacheUpdateException(errorMsg)
} else {
val incompatibleFeatures = brokerFeatures.incompatibleFeatures(
- latest.finalizedFeatures().asScala.map(kv => (kv._1,
kv._2.toShort)).toMap)
- if (incompatibleFeatures.nonEmpty) {
+ latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort:
java.lang.Short)).toMap.asJava)
+ if (!incompatibleFeatures.isEmpty) {
val errorMsg = "FinalizedFeatureCache update failed since feature
compatibility" +
s" checks failed! Supported ${brokerFeatures.supportedFeatures} has
incompatibilities" +
s" with the latest $latest."
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index ae0dcb94a4f..a7a18085bbb 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -20,6 +20,7 @@ import kafka.server.metadata.ZkMetadataCache
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.{Disabled, Test}
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
deleted file mode 100644
index 0bcac054cf4..00000000000
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
-import org.apache.kafka.server.common.{MetadataVersion, Features =>
ServerFeatures}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertTrue}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-import scala.jdk.CollectionConverters._
-
-class BrokerFeaturesTest {
-
- @Test
- def testEmpty(): Unit = {
- assertTrue(BrokerFeatures.createEmpty().supportedFeatures.empty)
- }
-
- @Test
- def testIncompatibilitiesDueToAbsentFeature(): Unit = {
- val brokerFeatures = BrokerFeatures.createDefault(true)
- val supportedFeatures = Features.supportedFeatures(Map[String,
SupportedVersionRange](
- "test_feature_1" -> new SupportedVersionRange(1, 4),
- "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
- brokerFeatures.setSupportedFeatures(supportedFeatures)
-
- val compatibleFeatures = Map[String, Short]("test_feature_1" -> 4)
- val inCompatibleFeatures = Map[String, Short]("test_feature_3" -> 4)
- val features = compatibleFeatures++inCompatibleFeatures
- val finalizedFeatures = features
-
- assertEquals(inCompatibleFeatures,
- brokerFeatures.incompatibleFeatures(finalizedFeatures))
- assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures,
finalizedFeatures))
- }
-
- @Test
- def testIncompatibilitiesDueToIncompatibleFeature(): Unit = {
- val brokerFeatures = BrokerFeatures.createDefault(true)
- val supportedFeatures = Features.supportedFeatures(Map[String,
SupportedVersionRange](
- "test_feature_1" -> new SupportedVersionRange(1, 4),
- "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
- brokerFeatures.setSupportedFeatures(supportedFeatures)
-
- val compatibleFeatures = Map[String, Short]("test_feature_1" -> 3)
- val inCompatibleFeatures = Map[String, Short]("test_feature_2" -> 4)
- val features = compatibleFeatures++inCompatibleFeatures
- val finalizedFeatures = features
-
- assertEquals(
- inCompatibleFeatures,
- brokerFeatures.incompatibleFeatures(finalizedFeatures))
- assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures,
finalizedFeatures))
- }
-
- @Test
- def testCompatibleFeatures(): Unit = {
- val brokerFeatures = BrokerFeatures.createDefault(true)
- val supportedFeatures = Features.supportedFeatures(Map[String,
SupportedVersionRange](
- "test_feature_1" -> new SupportedVersionRange(1, 4),
- "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
- brokerFeatures.setSupportedFeatures(supportedFeatures)
-
- val compatibleFeatures = Map[String, Short](
- "test_feature_1" -> 3,
- "test_feature_2" -> 3)
- val finalizedFeatures = compatibleFeatures
- assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty)
- assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures,
finalizedFeatures))
- }
-
- @Test
- def testDefaultFinalizedFeatures(): Unit = {
- val brokerFeatures = BrokerFeatures.createDefault(true)
- val supportedFeatures = Features.supportedFeatures(Map[String,
SupportedVersionRange](
- "test_feature_1" -> new SupportedVersionRange(1, 4),
- "test_feature_2" -> new SupportedVersionRange(1, 3),
- "test_feature_3" -> new SupportedVersionRange(3, 7)).asJava)
- brokerFeatures.setSupportedFeatures(supportedFeatures)
-
- val expectedFeatures = Map[String, Short](
- MetadataVersion.FEATURE_NAME ->
MetadataVersion.latestTesting().featureLevel(),
- ServerFeatures.TRANSACTION_VERSION.featureName() ->
ServerFeatures.TRANSACTION_VERSION.latestTesting(),
- ServerFeatures.GROUP_VERSION.featureName() ->
ServerFeatures.GROUP_VERSION.latestTesting(),
- "kraft.version" -> 0,
- "test_feature_1" -> 4,
- "test_feature_2" -> 3,
- "test_feature_3" -> 7)
- assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures)
- }
-
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def ensureDefaultSupportedFeaturesRangeMaxNotZero(unstableVersionsEnabled:
Boolean): Unit = {
- val brokerFeatures = BrokerFeatures.createDefault(unstableVersionsEnabled)
- brokerFeatures.supportedFeatures.features().values().forEach {
supportedVersionRange =>
- assertNotEquals(0, supportedVersionRange.max())
- }
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 8e88eea3c46..0f5d3f027e5 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest,
BrokerRegistrationResponse}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0,
IBP_3_9_IV0}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs, ZkConfigs}
@@ -120,7 +121,7 @@ class BrokerLifecycleManagerTest {
manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-", isZkBroker = false,
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
- val features =
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true))
+ val features =
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asScala
// Even though ZK brokers don't use "metadata.version" feature, we need to
overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the same IBP
before starting the migration.
diff --git
a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
index 98f6fa3bc8e..d98bcd23b25 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
@@ -19,6 +19,7 @@ package kafka.server
import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.Test
@@ -40,8 +41,7 @@ class FinalizedFeatureCacheTest {
def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
val supportedFeatures = Map[String, SupportedVersionRange](
"feature_1" -> new SupportedVersionRange(1, 4))
- val brokerFeatures = BrokerFeatures.createDefault(true)
-
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+ val brokerFeatures = BrokerFeatures.createDefault(true,
Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
@@ -63,8 +63,7 @@ class FinalizedFeatureCacheTest {
def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = {
val supportedFeatures =
Map[String, SupportedVersionRange]("feature_1" -> new
SupportedVersionRange(1, 1))
- val brokerFeatures = BrokerFeatures.createDefault(true)
-
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+ val brokerFeatures = BrokerFeatures.createDefault(true,
Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
@@ -79,8 +78,7 @@ class FinalizedFeatureCacheTest {
def testUpdateOrThrowSuccess(): Unit = {
val supportedFeatures =
Map[String, SupportedVersionRange]("feature_1" -> new
SupportedVersionRange(1, 4))
- val brokerFeatures = BrokerFeatures.createDefault(true)
-
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+ val brokerFeatures = BrokerFeatures.createDefault(true,
Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
@@ -95,8 +93,7 @@ class FinalizedFeatureCacheTest {
def testClear(): Unit = {
val supportedFeatures =
Map[String, SupportedVersionRange]("feature_1" -> new
SupportedVersionRange(1, 4))
- val brokerFeatures = BrokerFeatures.createDefault(true)
-
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
+ val brokerFeatures = BrokerFeatures.createDefault(true,
Features.supportedFeatures(supportedFeatures.asJava))
val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2674a314238..d9451804e4d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -82,7 +82,7 @@ import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult,
Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0,
IBP_2_2_IV1}
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 30fdb666185..e27f224931c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.common
+import org.apache.kafka.server.{BrokerFeatures, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion,
OffsetAndEpoch}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 661a638aa88..c5756ec61f6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType,
MemoryRecords, RecordBat
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 2f8f762145f..242d65556eb 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -24,7 +24,6 @@ import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.BrokerBlockingSender;
-import kafka.server.BrokerFeatures;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
@@ -63,6 +62,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.util.KafkaScheduler;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 0f8d2db4e2f..9a5fadd828e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -21,7 +21,6 @@ import kafka.controller.KafkaController;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.AutoTopicCreationManager;
-import kafka.server.BrokerFeatures;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager;
@@ -60,6 +59,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
+import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerConfigs;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 1edbca20eaa..5de1fd9a459 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.server;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
-import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
@@ -32,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.KafkaScheduler;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index c2411124bb7..f572c43250c 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.server;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
-import kafka.server.BrokerFeatures;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
@@ -37,6 +36,7 @@ import
org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
diff --git a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
new file mode 100644
index 00000000000..95777effe5e
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.common.Features.PRODUCTION_FEATURES;
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker
and finalized features.
+ * This class is immutable in production. It provides few APIs to mutate state
only for the purpose
+ * of testing.
+ */
+public class BrokerFeatures {
+
+ private final Features<SupportedVersionRange> supportedFeatures;
+ private static final Logger log =
LoggerFactory.getLogger(BrokerFeatures.class);
+
+ private BrokerFeatures(Features<SupportedVersionRange> supportedFeatures) {
+ this.supportedFeatures = supportedFeatures;
+ }
+
+ public static BrokerFeatures createDefault(boolean
unstableFeatureVersionsEnabled) {
+ return new
BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled));
+ }
+
+ // only for testing
+ public static BrokerFeatures createDefault(boolean
unstableFeatureVersionsEnabled, Features<SupportedVersionRange> newFeatures) {
+ Map<String, SupportedVersionRange> combined = new
HashMap<>(defaultSupportedFeatures(unstableFeatureVersionsEnabled).features());
+ combined.putAll(newFeatures.features());
+ return new BrokerFeatures(Features.supportedFeatures(combined));
+ }
+
+ public static Map<String, VersionRange>
createDefaultFeatureMap(BrokerFeatures features) {
+ return features.supportedFeatures.features()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
VersionRange.of(e.getValue().min(), e.getValue().max())));
+ }
+
+ public static Features<SupportedVersionRange>
defaultSupportedFeatures(boolean unstableFeatureVersionsEnabled) {
+ Map<String, SupportedVersionRange> features = new HashMap<>();
+ features.put(MetadataVersion.FEATURE_NAME,
+ new SupportedVersionRange(
+ MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
+ unstableFeatureVersionsEnabled ?
MetadataVersion.latestTesting().featureLevel()
+ :
MetadataVersion.latestProduction().featureLevel()));
+ PRODUCTION_FEATURES.forEach(feature -> {
+ int maxVersion = unstableFeatureVersionsEnabled ?
feature.latestTesting() : feature.latestProduction();
+ if (maxVersion > 0) {
+ features.put(feature.featureName(), new
SupportedVersionRange(feature.minimumProduction(), (short) maxVersion));
+ }
+ });
+ return Features.supportedFeatures(features);
+ }
+
+ public static BrokerFeatures createEmpty() {
+ return new BrokerFeatures(Features.emptySupportedFeatures());
+ }
+
+ /**
+ * Returns true if any of the provided finalized features are incompatible
with the provided
+ * supported features.
+ *
+ * @param supportedFeatures The supported features to be compared
+ * @param finalizedFeatures The finalized features to be compared
+ * @return - True if there are any feature incompatibilities found.
+ * - False otherwise.
+ */
+ public static boolean
hasIncompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
+ Map<String, Short>
finalizedFeatures) {
+ return !incompatibleFeatures(supportedFeatures, finalizedFeatures,
false).isEmpty();
+ }
+
+ /**
+ * Returns the default finalized features that a new Kafka cluster with
IBP config >= IBP_2_7_IV0
+ * needs to be bootstrapped with.
+ */
+ public Map<String, Short> defaultFinalizedFeatures() {
+ return supportedFeatures.features().entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> e.getKey().equals(KRaftVersion.FEATURE_NAME) ? 0
: e.getValue().max()));
+ }
+
+ /**
+ * Returns the set of feature names found to be incompatible.
+ * A feature incompatibility is a version mismatch between the latest
feature supported by the
+ * Broker, and a provided finalized feature. This can happen because a
provided finalized
+ * feature:
+ * 1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+ * [OR]
+ * 2) Exists but the FinalizedVersionRange does not match with the
SupportedVersionRange
+ * of the supported feature.
+ *
+ * @param finalized The finalized features against which incompatibilities
need to be checked for.
+ * @return The subset of input features which are incompatible. If the
returned object
+ * is empty, it means there were no feature incompatibilities found.
+ */
+ public Map<String, Short> incompatibleFeatures(Map<String, Short>
finalized) {
+ return BrokerFeatures.incompatibleFeatures(supportedFeatures,
finalized, true);
+ }
+
+ public Features<SupportedVersionRange> supportedFeatures() {
+ return supportedFeatures;
+ }
+
+ private static Map<String, Short>
incompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
+ Map<String, Short>
finalizedFeatures,
+ boolean
logIncompatibilities) {
+ Map<String, Short> incompatibleFeaturesInfo = new HashMap<>();
+ finalizedFeatures.forEach((feature, versionLevels) -> {
+ SupportedVersionRange supportedVersions =
supportedFeatures.get(feature);
+ if (supportedVersions == null) {
+ incompatibleFeaturesInfo.put(feature, versionLevels);
+ if (logIncompatibilities) {
+ log.warn("Feature incompatibilities seen: {feature={},
reason='Unknown feature'}", feature);
+ }
+ } else if (supportedVersions.isIncompatibleWith(versionLevels)) {
+ incompatibleFeaturesInfo.put(feature, versionLevels);
+ if (logIncompatibilities) {
+ log.warn("Feature incompatibilities seen: {feature={},
reason='{} is incompatible with {}'}",
+ feature, versionLevels, supportedVersions);
+ }
+ }
+ });
+ return incompatibleFeaturesInfo;
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
new file mode 100644
index 00000000000..6ce2b3a7e65
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.server.common.Features.GROUP_VERSION;
+import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BrokerFeaturesTest {
+
+ @Test
+ public void testEmpty() {
+ assertTrue(BrokerFeatures.createEmpty().supportedFeatures().empty());
+ }
+
+ @Test
+ public void testIncompatibilitiesDueToAbsentFeature() {
+ Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+ newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1,
(short) 4));
+ newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1,
(short) 3));
+ Features<SupportedVersionRange> supportedFeatures =
Features.supportedFeatures(newFeatures);
+ BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true,
supportedFeatures);
+
+ Map<String, Short> compatibleFeatures = new HashMap<>();
+ compatibleFeatures.put("test_feature_1", (short) 4);
+ Map<String, Short> inCompatibleFeatures = new HashMap<>();
+ inCompatibleFeatures.put("test_feature_2", (short) 4);
+
+ Map<String, Short> finalizedFeatures = new
HashMap<>(compatibleFeatures);
+ finalizedFeatures.putAll(inCompatibleFeatures);
+
+ assertEquals(inCompatibleFeatures,
+ brokerFeatures.incompatibleFeatures(finalizedFeatures));
+ assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures,
finalizedFeatures));
+ }
+
+ @Test
+ public void testIncompatibilitiesDueToIncompatibleFeature() {
+ Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+ newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1,
(short) 4));
+ newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1,
(short) 3));
+ Features<SupportedVersionRange> supportedFeatures =
Features.supportedFeatures(newFeatures);
+ BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true,
supportedFeatures);
+
+ Map<String, Short> compatibleFeatures = new HashMap<>();
+ compatibleFeatures.put("test_feature_1", (short) 3);
+ Map<String, Short> inCompatibleFeatures = new HashMap<>();
+ inCompatibleFeatures.put("test_feature_2", (short) 4);
+ Map<String, Short> finalizedFeatures = new
HashMap<>(compatibleFeatures);
+ finalizedFeatures.putAll(inCompatibleFeatures);
+
+ assertEquals(inCompatibleFeatures,
brokerFeatures.incompatibleFeatures(finalizedFeatures));
+ assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures,
finalizedFeatures));
+ }
+
+ @Test
+ public void testCompatibleFeatures() {
+ Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+ newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1,
(short) 4));
+ newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1,
(short) 3));
+ Features<SupportedVersionRange> supportedFeatures =
Features.supportedFeatures(newFeatures);
+ BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true,
supportedFeatures);
+
+ Map<String, Short> compatibleFeatures = new HashMap<>();
+ compatibleFeatures.put("test_feature_1", (short) 3);
+ compatibleFeatures.put("test_feature_2", (short) 3);
+ Map<String, Short> finalizedFeatures = new
HashMap<>(compatibleFeatures);
+
+
assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty());
+ assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures,
finalizedFeatures));
+ }
+
+ @Test
+ public void testDefaultFinalizedFeatures() {
+ Map<String, SupportedVersionRange> newFeatures = new HashMap<>();
+ newFeatures.put("test_feature_1", new SupportedVersionRange((short) 1,
(short) 4));
+ newFeatures.put("test_feature_2", new SupportedVersionRange((short) 1,
(short) 3));
+ newFeatures.put("test_feature_3", new SupportedVersionRange((short) 3,
(short) 7));
+ Features<SupportedVersionRange> supportedFeatures =
Features.supportedFeatures(newFeatures);
+ BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(true,
supportedFeatures);
+
+ Map<String, Short> expectedFeatures = new HashMap<>();
+ expectedFeatures.put(MetadataVersion.FEATURE_NAME,
MetadataVersion.latestTesting().featureLevel());
+ expectedFeatures.put(TRANSACTION_VERSION.featureName(),
TRANSACTION_VERSION.latestTesting());
+ expectedFeatures.put(GROUP_VERSION.featureName(),
GROUP_VERSION.latestTesting());
+ expectedFeatures.put("kraft.version", (short) 0);
+ expectedFeatures.put("test_feature_1", (short) 4);
+ expectedFeatures.put("test_feature_2", (short) 3);
+ expectedFeatures.put("test_feature_3", (short) 7);
+
+ assertEquals(expectedFeatures,
brokerFeatures.defaultFinalizedFeatures());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean
unstableVersionsEnabled) {
+ BrokerFeatures brokerFeatures =
BrokerFeatures.createDefault(unstableVersionsEnabled);
+ brokerFeatures.supportedFeatures().features()
+ .values()
+ .forEach(supportedVersionRange -> assertNotEquals(0,
supportedVersionRange.max()));
+ }
+}