This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6322bd165c0 KAFKA-18786 add nodeId to DescribeFeaturesOptions (#20674)
6322bd165c0 is described below

commit 6322bd165c0df276bad808d5c72d211f02bb8a09
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Dec 2 16:58:48 2025 +0800

    KAFKA-18786 add nodeId to DescribeFeaturesOptions (#20674)
    
    Support `kafka-features.sh` to describe features from a specific node.
    
    * Add `nodeId` to `DescribeFeaturesOptions`.
    * Add `--node-id` to `kafka-features.sh --describe`.
    
    KIP: https://cwiki.apache.org/confluence/x/5gnXF
    
    Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/clients/admin/DescribeFeaturesTest.java  | 136 ++++++++++++++++++++
 .../clients/admin/DescribeFeaturesOptions.java     |  19 +++
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  27 ++++
 docs/upgrade.html                                  |   4 +
 .../org/apache/kafka/tools/FeatureCommand.java     |  21 ++-
 .../org/apache/kafka/tools/FeatureCommandTest.java | 142 ++++++++++++++++++++-
 7 files changed, 347 insertions(+), 6 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeFeaturesTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeFeaturesTest.java
new file mode 100644
index 00000000000..6d1fe1903cc
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeFeaturesTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.common.GroupVersion;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.ShareVersion;
+import org.apache.kafka.server.common.StreamsVersion;
+import org.apache.kafka.server.common.TransactionVersion;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class DescribeFeaturesTest {
+
+    @ClusterTest(
+        types = {Type.KRAFT},
+        metadataVersion = MetadataVersion.IBP_3_7_IV0,
+        controllers = 2,
+        brokers = 2,
+        serverProperties = {
+            @ClusterConfigProperty(
+                id = 3000,
+                key = "unstable.api.versions.enable",
+                value = "true"
+                ),
+            @ClusterConfigProperty(
+                id = 3001,
+                key = "unstable.api.versions.enable",
+                value = "false"
+                ),
+            @ClusterConfigProperty(
+                id = 0,
+                key = "unstable.feature.versions.enable",
+                value = "true"
+                ),
+            @ClusterConfigProperty(
+                id = 1,
+                key = "unstable.feature.versions.enable",
+                value = "false"
+                )
+        }
+    )
+    public void testUnstableApiVersions(ClusterInstance clusterInstance) {
+        // Test unstable.api.versions.enable on controller nodes
+        try (Admin admin = clusterInstance.admin(Map.of(), true)) {
+            // unstable.api.versions.enable is true on node 3000
+            assertFeatures(admin, 3000, true, 
clusterInstance.config().metadataVersion());
+
+            // unstable.api.versions.enable is false on node 3001
+            assertFeatures(admin, 3001, false, 
clusterInstance.config().metadataVersion());
+        }
+
+        // Test unstable.feature.versions.enable on broker nodes
+        try (Admin admin = clusterInstance.admin()) {
+            // unstable.feature.versions.enable is true on node 0
+            assertFeatures(admin, 0, true, 
clusterInstance.config().metadataVersion());
+
+            // unstable.feature.versions.enable is false on node 1
+            assertFeatures(admin, 1, false, 
clusterInstance.config().metadataVersion());
+        }
+    }
+
+    @ClusterTest(types = {Type.KRAFT})
+    public void testSendRequestToWrongNodeType(ClusterInstance 
clusterInstance) {
+        try (Admin admin = clusterInstance.admin()) {
+            // use bootstrap-servers to send request to controller
+            assertThrows(
+                ExecutionException.class,
+                () -> admin.describeFeatures(new 
DescribeFeaturesOptions().nodeId(3000).timeoutMs(1000)).featureMetadata().get());
+        }
+
+        try (Admin admin = clusterInstance.admin(Map.of(), true)) {
+            // use bootstrap-controllers to send request to broker
+            assertThrows(
+                ExecutionException.class,
+                () -> admin.describeFeatures(new 
DescribeFeaturesOptions().nodeId(0).timeoutMs(1000)).featureMetadata().get());
+        }
+    }
+
+    private void assertFeatures(Admin admin, int nodeId, boolean unstable, 
MetadataVersion metadataVersion) {
+        FeatureMetadata featureMetadata = assertDoesNotThrow(
+            () -> admin.describeFeatures(new 
DescribeFeaturesOptions().nodeId(nodeId)).featureMetadata().get());
+
+        assertEquals(Map.of(
+            MetadataVersion.FEATURE_NAME, new 
FinalizedVersionRange(metadataVersion.featureLevel(), 
metadataVersion.featureLevel())
+        ), featureMetadata.finalizedFeatures());
+
+        if (unstable) {
+            assertEquals(Map.of(
+                GroupVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.GROUP_VERSION.minimumProduction(), 
Feature.GROUP_VERSION.latestTesting()),
+                KRaftVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.KRAFT_VERSION.minimumProduction(), 
Feature.KRAFT_VERSION.latestTesting()),
+                MetadataVersion.FEATURE_NAME, new 
SupportedVersionRange(MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.latestTesting().featureLevel()),
+                ShareVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.SHARE_VERSION.minimumProduction(), 
Feature.SHARE_VERSION.latestTesting()),
+                StreamsVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.STREAMS_VERSION.minimumProduction(), 
Feature.STREAMS_VERSION.latestTesting()),
+                TransactionVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.TRANSACTION_VERSION.minimumProduction(), 
Feature.TRANSACTION_VERSION.latestTesting()),
+                EligibleLeaderReplicasVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.minimumProduction(),
 Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting())
+            ), featureMetadata.supportedFeatures());
+        } else {
+            assertEquals(Map.of(
+                GroupVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.GROUP_VERSION.minimumProduction(), 
Feature.GROUP_VERSION.latestProduction()),
+                KRaftVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.KRAFT_VERSION.minimumProduction(), 
Feature.KRAFT_VERSION.latestProduction()),
+                MetadataVersion.FEATURE_NAME, new 
SupportedVersionRange(MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.latestProduction().featureLevel()),
+                ShareVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.SHARE_VERSION.minimumProduction(), 
Feature.SHARE_VERSION.latestProduction()),
+                StreamsVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.STREAMS_VERSION.minimumProduction(), 
Feature.STREAMS_VERSION.latestProduction()),
+                TransactionVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.TRANSACTION_VERSION.minimumProduction(), 
Feature.TRANSACTION_VERSION.latestProduction()),
+                EligibleLeaderReplicasVersion.FEATURE_NAME, new 
SupportedVersionRange(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.minimumProduction(),
 Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.latestProduction())
+            ), featureMetadata.supportedFeatures());
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
index 7b477c1bc0d..d364df61dc1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
@@ -16,8 +16,27 @@
  */
 package org.apache.kafka.clients.admin;
 
+import java.util.OptionalInt;
+
 /**
  * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
  */
 public class DescribeFeaturesOptions extends 
AbstractOptions<DescribeFeaturesOptions> {
+    private OptionalInt nodeId = OptionalInt.empty();
+
+    /**
+     * Set the node id to which the request should be sent.
+     */
+    public DescribeFeaturesOptions nodeId(int nodeId) {
+        this.nodeId = OptionalInt.of(nodeId);
+        return this;
+    }
+
+    /**
+     * The node id to which the request should be sent. If the node id is 
empty, the request will be sent to the
+     * arbitrary controller/broker.
+     */
+    public OptionalInt nodeId() {
+        return nodeId;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8478ed19f80..fa9440dfb91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4519,8 +4519,10 @@ public class KafkaAdminClient extends AdminClient {
     public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
         final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
         final long now = time.milliseconds();
+        final NodeProvider nodeProvider = options.nodeId().isPresent() ?
+            new ConstantNodeIdProvider(options.nodeId().getAsInt(), true) : 
new LeastLoadedBrokerOrActiveKController();
         final Call call = new Call(
-            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new 
LeastLoadedBrokerOrActiveKController()) {
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
nodeProvider) {
 
             private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
                 final Map<String, FinalizedVersionRange> finalizedFeatures = 
new HashMap<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 55b4119f0ce..93f6daaf8ec 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -8896,6 +8896,33 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeFeaturesWithNodeSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().prepareResponseFrom(
+                body -> body instanceof ApiVersionsRequest,
+                prepareApiVersionsResponseForDescribeFeatures(Errors.NONE),
+                env.cluster().nodeById(0));
+            final KafkaFuture<FeatureMetadata> future = 
env.adminClient().describeFeatures(
+                new 
DescribeFeaturesOptions().timeoutMs(10000).nodeId(0)).featureMetadata();
+            final FeatureMetadata metadata = future.get();
+            assertEquals(defaultFeatureMetadata(), metadata);
+        }
+    }
+
+    @Test
+    public void testDescribeFeaturesWithNodeFailure() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().prepareResponseFrom(
+                body -> body instanceof ApiVersionsRequest,
+                prepareApiVersionsResponseForDescribeFeatures(Errors.NONE),
+                env.cluster().nodeById(1));
+            final KafkaFuture<FeatureMetadata> future = 
env.adminClient().describeFeatures(
+                new 
DescribeFeaturesOptions().timeoutMs(1000).nodeId(0)).featureMetadata();
+            assertThrows(ExecutionException.class, future::get);
+        }
+    }
+
     @Test
     public void testDescribeMetadataQuorumSuccess() throws Exception {
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index fe1b8b80d49..4c807382f0f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -228,6 +228,10 @@
         It is now possible to specify the start time for a Kafka Streams 
punctuation, instead of relying on the non-deterministic time when you register 
it.
         For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/9QqWF";>KIP-1146</a>.
     </li>
+    <li>
+        Added an optional <code>--node-id</code> flag to the 
<code>FeatureCommand</code> command. It specifies the node to describe.
+        If not provided, an arbitrary node is used.
+    </li>
 </ul>
 
 <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index 3f1f309abf5..3351b7d776e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeFeaturesOptions;
 import org.apache.kafka.clients.admin.FeatureMetadata;
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.clients.admin.SupportedVersionRange;
@@ -104,7 +105,7 @@ public class FeatureCommand {
         try (Admin adminClient = Admin.create(properties)) {
             switch (command) {
                 case "describe":
-                    handleDescribe(adminClient);
+                    handleDescribe(namespace, adminClient);
                     break;
                 case "upgrade":
                     handleUpgrade(namespace, adminClient);
@@ -128,8 +129,12 @@ public class FeatureCommand {
     }
 
     private static void addDescribeParser(Subparsers subparsers) {
-        subparsers.addParser("describe")
+        Subparser describeParser = subparsers.addParser("describe")
                 .help("Describes the current active feature flags.");
+        describeParser.addArgument("--node-id")
+            .type(Integer.class)
+            .help("The node id to which the requests should be sent. If not 
specified, the requests will be sent to an arbitrary controller/broker.")
+            .action(store());
     }
 
     private static void addUpgradeParser(Subparsers subparsers) {
@@ -223,8 +228,16 @@ public class FeatureCommand {
         return String.valueOf(level);
     }
 
-    static void handleDescribe(Admin adminClient) throws ExecutionException, 
InterruptedException {
-        FeatureMetadata featureMetadata = 
adminClient.describeFeatures().featureMetadata().get();
+    static void handleDescribe(Namespace namespace, Admin adminClient) throws 
ExecutionException, InterruptedException {
+        DescribeFeaturesOptions describeFeaturesOptions = new 
DescribeFeaturesOptions();
+        if (namespace.getInt("node_id") != null) {
+            int nodeId = namespace.getInt("node_id");
+            if (nodeId < 0) {
+                throw new IllegalArgumentException("Invalid node id " + nodeId 
+ ": must be non-negative.");
+            }
+            describeFeaturesOptions = 
describeFeaturesOptions.nodeId(namespace.getInt("node_id"));
+        }
+        FeatureMetadata featureMetadata = 
adminClient.describeFeatures(describeFeaturesOptions).featureMetadata().get();
         
featureMetadata.supportedFeatures().keySet().stream().sorted().forEach(feature 
-> {
             short finalizedLevel = 
(featureMetadata.finalizedFeatures().get(feature) == null) ? 0 : 
featureMetadata.finalizedFeatures().get(feature).maxVersionLevel();
             SupportedVersionRange range = 
featureMetadata.supportedFeatures().get(feature);
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 53c18d299bc..c289eb1212a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.common.Feature;
@@ -95,6 +96,138 @@ public class FeatureCommandTest {
                 "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(6)));
     }
 
+    @ClusterTest(
+        types = {Type.KRAFT},
+        metadataVersion = MetadataVersion.IBP_3_7_IV0,
+        controllers = 2,
+        serverProperties = {
+            @ClusterConfigProperty(
+                id = 3000,
+                key = "unstable.api.versions.enable",
+                value = "true"
+                ),
+            @ClusterConfigProperty(
+                id = 3001,
+                key = "unstable.api.versions.enable",
+                value = "false"
+                )
+        }
+    )
+    public void testDescribeWithUnstableApiVersions(ClusterInstance cluster) {
+        String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+            assertEquals(
+                0,
+                FeatureCommand.mainNoExit("--bootstrap-controller", 
cluster.bootstrapControllers(), "describe", "--node-id", "3000")
+            )
+        );
+        List<String> featuresWithUnstable = 
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+        // Change expected message to reflect latest MetadataVersion 
(SupportedMaxVersion increases when adding a new version)
+        assertEquals("Feature: 
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(0)));
+        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(1)));
+        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(2)));
+        assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
+            "SupportedMaxVersion: 4.3-IV0\tFinalizedVersionLevel: 3.7-IV0\t", 
outputWithoutEpoch(featuresWithUnstable.get(3)));
+        assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(4)));
+        assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(5)));
+        assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(6)));
+
+        commandOutput = ToolsTestUtils.captureStandardOut(() ->
+            assertEquals(
+                0,
+                FeatureCommand.mainNoExit("--bootstrap-controller", 
cluster.bootstrapControllers(), "describe", "--node-id", "3001")
+            )
+        );
+        List<String> featuresWithoutUnstable = 
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+        assertEquals("Feature: 
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(0)));
+        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(1)));
+        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(2)));
+        assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
+            "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.7-IV0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(3)));
+        assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(4)));
+        assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(5)));
+        assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(6)));
+    }
+
+    @ClusterTest(
+        types = {Type.KRAFT},
+        metadataVersion = MetadataVersion.IBP_3_7_IV0,
+        brokers = 2,
+        serverProperties = {
+            @ClusterConfigProperty(
+                id = 0,
+                key = "unstable.feature.versions.enable",
+                value = "true"
+                ),
+            @ClusterConfigProperty(
+                id = 1,
+                key = "unstable.feature.versions.enable",
+                value = "false"
+                )
+        }
+    )
+    public void testDescribeWithUnstableFeatureVersions(ClusterInstance 
cluster) {
+        String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+            assertEquals(
+                0,
+                FeatureCommand.mainNoExit("--bootstrap-server", 
cluster.bootstrapServers(), "describe", "--node-id", "0")
+            )
+        );
+        List<String> featuresWithUnstable = 
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+        // Change expected message to reflect latest MetadataVersion 
(SupportedMaxVersion increases when adding a new version)
+        assertEquals("Feature: 
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(0)));
+        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(1)));
+        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(2)));
+        assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
+            "SupportedMaxVersion: 4.3-IV0\tFinalizedVersionLevel: 3.7-IV0\t", 
outputWithoutEpoch(featuresWithUnstable.get(3)));
+        assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(4)));
+        assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(5)));
+        assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(6)));
+
+        commandOutput = ToolsTestUtils.captureStandardOut(() ->
+            assertEquals(
+                0,
+                FeatureCommand.mainNoExit("--bootstrap-server", 
cluster.bootstrapServers(), "describe", "--node-id", "1")
+            )
+        );
+        List<String> featuresWithoutUnstable = 
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+        assertEquals("Feature: 
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(0)));
+        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(1)));
+        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(2)));
+        assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
+            "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.7-IV0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(3)));
+        assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(4)));
+        assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithUnstable.get(5)));
+        assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+            "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(featuresWithoutUnstable.get(6)));
+    }
+
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV3)
     public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
@@ -252,7 +385,7 @@ public class FeatureCommandTest {
     public void testHandleDescribe() {
         String describeResult = ToolsTestUtils.captureStandardOut(() -> {
             try {
-                FeatureCommand.handleDescribe(buildAdminClient());
+                FeatureCommand.handleDescribe(new Namespace(Map.of()), 
buildAdminClient());
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
@@ -261,6 +394,13 @@ public class FeatureCommandTest {
             "Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\tSupportedMaxVersion: 3.5-IV0\tFinalizedVersionLevel: 3.4-IV0\tEpoch: 
123"), describeResult);
     }
 
+    @Test
+    public void testHandleDescribeWithNegativeNodeId() {
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> FeatureCommand.handleDescribe(new 
Namespace(Map.of("node_id", -1)), buildAdminClient()));
+    }
+
     @Test
     public void testHandleUpgradeToUnsupportedMetadataVersion() {
         Map<String, Object> namespace = new HashMap<>();

Reply via email to