This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5c0b8bef0788243a57f583a9683ef6e000cbd261
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Mar 7 13:46:46 2025 -0800

    KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse 
(#19127)
    
    The kafka controllers need to set kraft.version in their
    ApiVersionsResponse messages according to the current kraft.version
    reported by the Raft layer. Instead, currently they always set it to 0.
    
    Also remove FeatureControlManager.latestFinalizedFeatures. It is not
    needed and it does a lot of copying.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/server/ControllerServer.scala |  6 +++--
 .../kafka/server/KRaftClusterTest.scala            | 29 ++++++++++++++++++++++
 .../kafka/controller/FeatureControlManager.java    | 11 +-------
 .../kafka/server/common/FinalizedFeatures.java     | 13 ++++++++++
 4 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 65f9f9191d6..1933a55dfeb 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.authorizer.Authorizer
 import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, 
NodeToControllerChannelManager}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.config.ConfigType
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, 
LinuxIoMetricsCollector}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
@@ -152,7 +152,9 @@ class ControllerServer(
       val apiVersionManager = new SimpleApiVersionManager(
         ListenerType.CONTROLLER,
         config.unstableApiVersionsEnabled,
-        () => featuresPublisher.features()
+        () => featuresPublisher.features().setFinalizedLevel(
+          KRaftVersion.FEATURE_NAME,
+          raftManager.client.kraftVersion().featureLevel())
       )
 
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 563ff163fb5..17a75080ba1 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1006,6 +1006,35 @@ class KRaftClusterTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).
+        setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.newClientPropertiesBuilder().
+        setUsingBootstrapControllers(usingBootstrapControlers).
+        build())
+      try {
+        val featureMetadata = admin.describeFeatures().featureMetadata().get()
+        assertEquals(new SupportedVersionRange(0, 1),
+          featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
+        assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
+          featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
   @Test
   def testRemoteLogManagerInstantiation(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 40eb23ce639..5bbe3b9f148 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -359,15 +359,6 @@ public class FeatureControlManager {
         return new FinalizedControllerFeatures(features, epoch);
     }
 
-    FinalizedControllerFeatures latestFinalizedFeatures() {
-        Map<String, Short> features = new HashMap<>();
-        features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get().featureLevel());
-        for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
-            features.put(entry.getKey(), entry.getValue());
-        }
-        return new FinalizedControllerFeatures(features, -1);
-    }
-
     public void replay(FeatureLevelRecord record) {
         VersionRange range = 
quorumFeatures.localSupportedFeature(record.name());
         if (!range.contains(record.featureLevel())) {
@@ -395,7 +386,7 @@ public class FeatureControlManager {
     }
 
     boolean isElrFeatureEnabled() {
-        return 
latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
 (short) 0) >=
+        return 
finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, 
(short) 0) >=
             EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index de78a3a72a8..853435182e3 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -82,4 +82,17 @@ public final class FinalizedFeatures {
                 ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
                 ")";
     }
+
+    public FinalizedFeatures setFinalizedLevel(String key, short level) {
+        if (level == (short) 0) {
+            return this;
+        } else {
+            Map<String, Short> newFinalizedFeatures = new 
HashMap<>(finalizedFeatures);
+            newFinalizedFeatures.put(key, level);
+            return new FinalizedFeatures(
+                metadataVersion,
+                newFinalizedFeatures,
+                finalizedFeaturesEpoch);
+        }
+    }
 }

Reply via email to