This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new dc6ac807af5 KAFKA-18786 add nodeId to DescribeFeaturesOptions (#20674)
dc6ac807af5 is described below
commit dc6ac807af545a4ff5d4e41f104097392ef148b8
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Dec 2 16:58:48 2025 +0800
KAFKA-18786 add nodeId to DescribeFeaturesOptions (#20674)
Support `kafka-features.sh` to describe features from a specific node.
* Add `nodeId` to `DescribeFeaturesOptions`.
* Add `--node-id` to `kafka-features.sh --describe`.
KIP: https://cwiki.apache.org/confluence/x/5gnXF
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/admin/DescribeFeaturesTest.java | 136 ++++++++++++++++++++
.../clients/admin/DescribeFeaturesOptions.java | 19 +++
.../kafka/clients/admin/KafkaAdminClient.java | 4 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 27 ++++
docs/upgrade.html | 4 +
.../org/apache/kafka/tools/FeatureCommand.java | 21 ++-
.../org/apache/kafka/tools/FeatureCommandTest.java | 142 ++++++++++++++++++++-
7 files changed, 347 insertions(+), 6 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeFeaturesTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeFeaturesTest.java
new file mode 100644
index 00000000000..6d1fe1903cc
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeFeaturesTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.common.GroupVersion;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.ShareVersion;
+import org.apache.kafka.server.common.StreamsVersion;
+import org.apache.kafka.server.common.TransactionVersion;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class DescribeFeaturesTest {
+
+ @ClusterTest(
+ types = {Type.KRAFT},
+ metadataVersion = MetadataVersion.IBP_3_7_IV0,
+ controllers = 2,
+ brokers = 2,
+ serverProperties = {
+ @ClusterConfigProperty(
+ id = 3000,
+ key = "unstable.api.versions.enable",
+ value = "true"
+ ),
+ @ClusterConfigProperty(
+ id = 3001,
+ key = "unstable.api.versions.enable",
+ value = "false"
+ ),
+ @ClusterConfigProperty(
+ id = 0,
+ key = "unstable.feature.versions.enable",
+ value = "true"
+ ),
+ @ClusterConfigProperty(
+ id = 1,
+ key = "unstable.feature.versions.enable",
+ value = "false"
+ )
+ }
+ )
+ public void testUnstableApiVersions(ClusterInstance clusterInstance) {
+ // Test unstable.api.versions.enable on controller nodes
+ try (Admin admin = clusterInstance.admin(Map.of(), true)) {
+ // unstable.api.versions.enable is true on node 3000
+ assertFeatures(admin, 3000, true,
clusterInstance.config().metadataVersion());
+
+ // unstable.api.versions.enable is false on node 3001
+ assertFeatures(admin, 3001, false,
clusterInstance.config().metadataVersion());
+ }
+
+ // Test unstable.feature.versions.enable on broker nodes
+ try (Admin admin = clusterInstance.admin()) {
+ // unstable.feature.versions.enable is true on node 0
+ assertFeatures(admin, 0, true,
clusterInstance.config().metadataVersion());
+
+ // unstable.feature.versions.enable is false on node 1
+ assertFeatures(admin, 1, false,
clusterInstance.config().metadataVersion());
+ }
+ }
+
+ @ClusterTest(types = {Type.KRAFT})
+ public void testSendRequestToWrongNodeType(ClusterInstance
clusterInstance) {
+ try (Admin admin = clusterInstance.admin()) {
+ // use bootstrap-servers to send request to controller
+ assertThrows(
+ ExecutionException.class,
+ () -> admin.describeFeatures(new
DescribeFeaturesOptions().nodeId(3000).timeoutMs(1000)).featureMetadata().get());
+ }
+
+ try (Admin admin = clusterInstance.admin(Map.of(), true)) {
+ // use bootstrap-controllers to send request to broker
+ assertThrows(
+ ExecutionException.class,
+ () -> admin.describeFeatures(new
DescribeFeaturesOptions().nodeId(0).timeoutMs(1000)).featureMetadata().get());
+ }
+ }
+
+ private void assertFeatures(Admin admin, int nodeId, boolean unstable,
MetadataVersion metadataVersion) {
+ FeatureMetadata featureMetadata = assertDoesNotThrow(
+ () -> admin.describeFeatures(new
DescribeFeaturesOptions().nodeId(nodeId)).featureMetadata().get());
+
+ assertEquals(Map.of(
+ MetadataVersion.FEATURE_NAME, new
FinalizedVersionRange(metadataVersion.featureLevel(),
metadataVersion.featureLevel())
+ ), featureMetadata.finalizedFeatures());
+
+ if (unstable) {
+ assertEquals(Map.of(
+ GroupVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.GROUP_VERSION.minimumProduction(),
Feature.GROUP_VERSION.latestTesting()),
+ KRaftVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.KRAFT_VERSION.minimumProduction(),
Feature.KRAFT_VERSION.latestTesting()),
+ MetadataVersion.FEATURE_NAME, new
SupportedVersionRange(MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()),
+ ShareVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.SHARE_VERSION.minimumProduction(),
Feature.SHARE_VERSION.latestTesting()),
+ StreamsVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.STREAMS_VERSION.minimumProduction(),
Feature.STREAMS_VERSION.latestTesting()),
+ TransactionVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.TRANSACTION_VERSION.minimumProduction(),
Feature.TRANSACTION_VERSION.latestTesting()),
+ EligibleLeaderReplicasVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.minimumProduction(),
Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting())
+ ), featureMetadata.supportedFeatures());
+ } else {
+ assertEquals(Map.of(
+ GroupVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.GROUP_VERSION.minimumProduction(),
Feature.GROUP_VERSION.latestProduction()),
+ KRaftVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.KRAFT_VERSION.minimumProduction(),
Feature.KRAFT_VERSION.latestProduction()),
+ MetadataVersion.FEATURE_NAME, new
SupportedVersionRange(MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestProduction().featureLevel()),
+ ShareVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.SHARE_VERSION.minimumProduction(),
Feature.SHARE_VERSION.latestProduction()),
+ StreamsVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.STREAMS_VERSION.minimumProduction(),
Feature.STREAMS_VERSION.latestProduction()),
+ TransactionVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.TRANSACTION_VERSION.minimumProduction(),
Feature.TRANSACTION_VERSION.latestProduction()),
+ EligibleLeaderReplicasVersion.FEATURE_NAME, new
SupportedVersionRange(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.minimumProduction(),
Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.latestProduction())
+ ), featureMetadata.supportedFeatures());
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
index 7b477c1bc0d..d364df61dc1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
@@ -16,8 +16,27 @@
*/
package org.apache.kafka.clients.admin;
+import java.util.OptionalInt;
+
/**
* Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
*/
public class DescribeFeaturesOptions extends
AbstractOptions<DescribeFeaturesOptions> {
+ private OptionalInt nodeId = OptionalInt.empty();
+
+ /**
+ * Set the node id to which the request should be sent.
+ */
+ public DescribeFeaturesOptions nodeId(int nodeId) {
+ this.nodeId = OptionalInt.of(nodeId);
+ return this;
+ }
+
+ /**
+ * The node id to which the request should be sent. If the node id is
empty, the request will be sent to the
+ * arbitrary controller/broker.
+ */
+ public OptionalInt nodeId() {
+ return nodeId;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8478ed19f80..fa9440dfb91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4519,8 +4519,10 @@ public class KafkaAdminClient extends AdminClient {
public DescribeFeaturesResult describeFeatures(final
DescribeFeaturesOptions options) {
final KafkaFutureImpl<FeatureMetadata> future = new
KafkaFutureImpl<>();
final long now = time.milliseconds();
+ final NodeProvider nodeProvider = options.nodeId().isPresent() ?
+ new ConstantNodeIdProvider(options.nodeId().getAsInt(), true) :
new LeastLoadedBrokerOrActiveKController();
final Call call = new Call(
- "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new
LeastLoadedBrokerOrActiveKController()) {
+ "describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
nodeProvider) {
private FeatureMetadata createFeatureMetadata(final
ApiVersionsResponse response) {
final Map<String, FinalizedVersionRange> finalizedFeatures =
new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 55b4119f0ce..93f6daaf8ec 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -8896,6 +8896,33 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeFeaturesWithNodeSuccess() throws Exception {
+ try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().prepareResponseFrom(
+ body -> body instanceof ApiVersionsRequest,
+ prepareApiVersionsResponseForDescribeFeatures(Errors.NONE),
+ env.cluster().nodeById(0));
+ final KafkaFuture<FeatureMetadata> future =
env.adminClient().describeFeatures(
+ new
DescribeFeaturesOptions().timeoutMs(10000).nodeId(0)).featureMetadata();
+ final FeatureMetadata metadata = future.get();
+ assertEquals(defaultFeatureMetadata(), metadata);
+ }
+ }
+
+ @Test
+ public void testDescribeFeaturesWithNodeFailure() throws Exception {
+ try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().prepareResponseFrom(
+ body -> body instanceof ApiVersionsRequest,
+ prepareApiVersionsResponseForDescribeFeatures(Errors.NONE),
+ env.cluster().nodeById(1));
+ final KafkaFuture<FeatureMetadata> future =
env.adminClient().describeFeatures(
+ new
DescribeFeaturesOptions().timeoutMs(1000).nodeId(0)).featureMetadata();
+ assertThrows(ExecutionException.class, future::get);
+ }
+ }
+
@Test
public void testDescribeMetadataQuorumSuccess() throws Exception {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index fe1b8b80d49..4c807382f0f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -228,6 +228,10 @@
It is now possible to specify the start time for a Kafka Streams
punctuation, instead of relying on the non-deterministic time when you register
it.
For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/9QqWF">KIP-1146</a>.
</li>
+ <li>
+ Added an optional <code>--node-id</code> flag to the
<code>FeatureCommand</code> command. It specifies the node to describe.
+ If not provided, an arbitrary node is used.
+ </li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index 3f1f309abf5..3351b7d776e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -17,6 +17,7 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeFeaturesOptions;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.SupportedVersionRange;
@@ -104,7 +105,7 @@ public class FeatureCommand {
try (Admin adminClient = Admin.create(properties)) {
switch (command) {
case "describe":
- handleDescribe(adminClient);
+ handleDescribe(namespace, adminClient);
break;
case "upgrade":
handleUpgrade(namespace, adminClient);
@@ -128,8 +129,12 @@ public class FeatureCommand {
}
private static void addDescribeParser(Subparsers subparsers) {
- subparsers.addParser("describe")
+ Subparser describeParser = subparsers.addParser("describe")
.help("Describes the current active feature flags.");
+ describeParser.addArgument("--node-id")
+ .type(Integer.class)
+ .help("The node id to which the requests should be sent. If not
specified, the requests will be sent to an arbitrary controller/broker.")
+ .action(store());
}
private static void addUpgradeParser(Subparsers subparsers) {
@@ -223,8 +228,16 @@ public class FeatureCommand {
return String.valueOf(level);
}
- static void handleDescribe(Admin adminClient) throws ExecutionException,
InterruptedException {
- FeatureMetadata featureMetadata =
adminClient.describeFeatures().featureMetadata().get();
+ static void handleDescribe(Namespace namespace, Admin adminClient) throws
ExecutionException, InterruptedException {
+ DescribeFeaturesOptions describeFeaturesOptions = new
DescribeFeaturesOptions();
+ if (namespace.getInt("node_id") != null) {
+ int nodeId = namespace.getInt("node_id");
+ if (nodeId < 0) {
+ throw new IllegalArgumentException("Invalid node id " + nodeId
+ ": must be non-negative.");
+ }
+ describeFeaturesOptions =
describeFeaturesOptions.nodeId(namespace.getInt("node_id"));
+ }
+ FeatureMetadata featureMetadata =
adminClient.describeFeatures(describeFeaturesOptions).featureMetadata().get();
featureMetadata.supportedFeatures().keySet().stream().sorted().forEach(feature
-> {
short finalizedLevel =
(featureMetadata.finalizedFeatures().get(feature) == null) ? 0 :
featureMetadata.finalizedFeatures().get(feature).maxVersionLevel();
SupportedVersionRange range =
featureMetadata.supportedFeatures().get(feature);
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 53c18d299bc..c289eb1212a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.Feature;
@@ -95,6 +96,138 @@ public class FeatureCommandTest {
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(6)));
}
+ @ClusterTest(
+ types = {Type.KRAFT},
+ metadataVersion = MetadataVersion.IBP_3_7_IV0,
+ controllers = 2,
+ serverProperties = {
+ @ClusterConfigProperty(
+ id = 3000,
+ key = "unstable.api.versions.enable",
+ value = "true"
+ ),
+ @ClusterConfigProperty(
+ id = 3001,
+ key = "unstable.api.versions.enable",
+ value = "false"
+ )
+ }
+ )
+ public void testDescribeWithUnstableApiVersions(ClusterInstance cluster) {
+ String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+ assertEquals(
+ 0,
+ FeatureCommand.mainNoExit("--bootstrap-controller",
cluster.bootstrapControllers(), "describe", "--node-id", "3000")
+ )
+ );
+ List<String> featuresWithUnstable =
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+ // Change expected message to reflect latest MetadataVersion
(SupportedMaxVersion increases when adding a new version)
+ assertEquals("Feature:
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(0)));
+ assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(1)));
+ assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(2)));
+ assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
+ "SupportedMaxVersion: 4.3-IV0\tFinalizedVersionLevel: 3.7-IV0\t",
outputWithoutEpoch(featuresWithUnstable.get(3)));
+ assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(4)));
+ assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(5)));
+ assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(6)));
+
+ commandOutput = ToolsTestUtils.captureStandardOut(() ->
+ assertEquals(
+ 0,
+ FeatureCommand.mainNoExit("--bootstrap-controller",
cluster.bootstrapControllers(), "describe", "--node-id", "3001")
+ )
+ );
+ List<String> featuresWithoutUnstable =
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+ assertEquals("Feature:
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(0)));
+ assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(1)));
+ assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(2)));
+ assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
+ "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.7-IV0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(3)));
+ assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(4)));
+ assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(5)));
+ assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(6)));
+ }
+
+ @ClusterTest(
+ types = {Type.KRAFT},
+ metadataVersion = MetadataVersion.IBP_3_7_IV0,
+ brokers = 2,
+ serverProperties = {
+ @ClusterConfigProperty(
+ id = 0,
+ key = "unstable.feature.versions.enable",
+ value = "true"
+ ),
+ @ClusterConfigProperty(
+ id = 1,
+ key = "unstable.feature.versions.enable",
+ value = "false"
+ )
+ }
+ )
+ public void testDescribeWithUnstableFeatureVersions(ClusterInstance
cluster) {
+ String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+ assertEquals(
+ 0,
+ FeatureCommand.mainNoExit("--bootstrap-server",
cluster.bootstrapServers(), "describe", "--node-id", "0")
+ )
+ );
+ List<String> featuresWithUnstable =
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+ // Change expected message to reflect latest MetadataVersion
(SupportedMaxVersion increases when adding a new version)
+ assertEquals("Feature:
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(0)));
+ assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(1)));
+ assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(2)));
+ assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
+ "SupportedMaxVersion: 4.3-IV0\tFinalizedVersionLevel: 3.7-IV0\t",
outputWithoutEpoch(featuresWithUnstable.get(3)));
+ assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(4)));
+ assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(5)));
+ assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(6)));
+
+ commandOutput = ToolsTestUtils.captureStandardOut(() ->
+ assertEquals(
+ 0,
+ FeatureCommand.mainNoExit("--bootstrap-server",
cluster.bootstrapServers(), "describe", "--node-id", "1")
+ )
+ );
+ List<String> featuresWithoutUnstable =
Arrays.stream(commandOutput.split("\n")).sorted().toList();
+
+ assertEquals("Feature:
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(0)));
+ assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(1)));
+ assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(2)));
+ assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
+ "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.7-IV0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(3)));
+ assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(4)));
+ assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithUnstable.get(5)));
+ assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(featuresWithoutUnstable.get(6)));
+ }
+
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV3)
public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
@@ -252,7 +385,7 @@ public class FeatureCommandTest {
public void testHandleDescribe() {
String describeResult = ToolsTestUtils.captureStandardOut(() -> {
try {
- FeatureCommand.handleDescribe(buildAdminClient());
+ FeatureCommand.handleDescribe(new Namespace(Map.of()),
buildAdminClient());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -261,6 +394,13 @@ public class FeatureCommandTest {
"Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\tSupportedMaxVersion: 3.5-IV0\tFinalizedVersionLevel: 3.4-IV0\tEpoch:
123"), describeResult);
}
+ @Test
+ public void testHandleDescribeWithNegativeNodeId() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> FeatureCommand.handleDescribe(new
Namespace(Map.of("node_id", -1)), buildAdminClient()));
+ }
+
@Test
public void testHandleUpgradeToUnsupportedMetadataVersion() {
Map<String, Object> namespace = new HashMap<>();