This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5c0b8bef0788243a57f583a9683ef6e000cbd261 Author: Colin Patrick McCabe <[email protected]> AuthorDate: Fri Mar 7 13:46:46 2025 -0800 KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse (#19127) The kafka controllers need to set kraft.version in their ApiVersionsResponse messages according to the current kraft.version reported by the Raft layer. Instead, currently they always set it to 0. Also remove FeatureControlManager.latestFinalizedFeatures. It is not needed and it does a lot of copying. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]> --- .../main/scala/kafka/server/ControllerServer.scala | 6 +++-- .../kafka/server/KRaftClusterTest.scala | 29 ++++++++++++++++++++++ .../kafka/controller/FeatureControlManager.java | 11 +------- .../kafka/server/common/FinalizedFeatures.java | 13 ++++++++++ 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 65f9f9191d6..1933a55dfeb 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -41,7 +41,7 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} -import org.apache.kafka.server.common.{ApiMessageAndVersion, NodeToControllerChannelManager} +import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager} import org.apache.kafka.server.config.ConfigType import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} @@ -152,7 +152,9 @@ class ControllerServer( val apiVersionManager = new SimpleApiVersionManager( ListenerType.CONTROLLER, config.unstableApiVersionsEnabled, - () => featuresPublisher.features() + () => featuresPublisher.features().setFinalizedLevel( + KRaftVersion.FEATURE_NAME, + raftManager.client.kraftVersion().featureLevel()) ) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 563ff163fb5..17a75080ba1 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1006,6 +1006,35 @@ class KRaftClusterTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1). + setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.newClientPropertiesBuilder(). + setUsingBootstrapControllers(usingBootstrapControlers). + build()) + try { + val featureMetadata = admin.describeFeatures().featureMetadata().get() + assertEquals(new SupportedVersionRange(0, 1), + featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME)) + assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort), + featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME)) + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + @Test def testRemoteLogManagerInstantiation(): Unit = { val cluster = new KafkaClusterTestKit.Builder( diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index 40eb23ce639..5bbe3b9f148 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -359,15 +359,6 @@ public class FeatureControlManager { return new FinalizedControllerFeatures(features, epoch); } - FinalizedControllerFeatures latestFinalizedFeatures() { - Map<String, Short> features = new HashMap<>(); - features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel()); - for (Entry<String, Short> entry : finalizedVersions.entrySet()) { - features.put(entry.getKey(), entry.getValue()); - } - return new FinalizedControllerFeatures(features, -1); - } - public void replay(FeatureLevelRecord record) { VersionRange range = quorumFeatures.localSupportedFeature(record.name()); if (!range.contains(record.featureLevel())) { @@ -395,7 +386,7 @@ public class FeatureControlManager { } boolean isElrFeatureEnabled() { - return latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >= + return finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel(); } } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java index de78a3a72a8..853435182e3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java @@ -82,4 +82,17 @@ public final class FinalizedFeatures { ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch + ")"; } + + public FinalizedFeatures setFinalizedLevel(String key, short level) { + if (level == (short) 0) { + return this; + } else { + Map<String, Short> newFinalizedFeatures = new HashMap<>(finalizedFeatures); + newFinalizedFeatures.put(key, level); + return new FinalizedFeatures( + metadataVersion, + newFinalizedFeatures, + finalizedFeaturesEpoch); + } + } }
