This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3d77f9891f MINOR: add Admin#updateFeatures overload method (#21380)
b3d77f9891f is described below
commit b3d77f9891f5d656197663f067dd2e61f6edcb58
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Feb 1 03:09:16 2026 +0800
MINOR: add Admin#updateFeatures overload method (#21380)
Add missing updateFeatures overload without the options parameter.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/MetadataVersionIntegrationTest.java | 7 ++-----
.../src/main/java/org/apache/kafka/clients/admin/Admin.java | 13 +++++++++++++
.../apache/kafka/clients/admin/KafkaAdminClientTest.java | 6 ++----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 3 +--
.../unit/kafka/integration/UncleanLeaderElectionTest.scala | 5 ++---
.../kafka/server/BootstrapControllersIntegrationTest.java | 4 +---
.../kafka/server/EligibleLeaderReplicasIntegrationTest.java | 13 ++++---------
.../test/java/org/apache/kafka/server/KRaftClusterTest.java | 4 +---
.../apache/kafka/tools/streams/DeleteStreamsGroupTest.java | 3 +--
9 files changed, 27 insertions(+), 31 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
index 3d560892fe8..864f8ed7422 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.FeatureUpdate;
-import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
@@ -47,8 +46,7 @@ public class MetadataVersionIntegrationTest {
// Update to new version
short updateVersion = MetadataVersion.IBP_3_7_IV1.featureLevel();
var updateResult = admin.updateFeatures(
- Map.of("metadata.version", new
FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions());
+ Map.of("metadata.version", new
FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)));
updateResult.all().get();
// Verify that new version is visible on broker
@@ -69,8 +67,7 @@ public class MetadataVersionIntegrationTest {
try (var admin = clusterInstance.admin()) {
short updateVersion = MetadataVersion.IBP_3_9_IV0.featureLevel();
var updateResult = admin.updateFeatures(
- Map.of("metadata.version", new
FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions());
+ Map.of("metadata.version", new
FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)));
updateResult.all().get();
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index c5ed11d8237..313c29ca49e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1543,6 +1543,19 @@ public interface Admin extends AutoCloseable {
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+ /**
+ * Applies specified updates to finalized features.
+ * <p>
+ * This is a convenience method for {@link #updateFeatures(Map,
UpdateFeaturesOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @param featureUpdates the map of finalized feature name to {@link
FeatureUpdate}
+ * @return the {@link UpdateFeaturesResult} containing the result
+ */
+ default UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate>
featureUpdates) {
+ return updateFeatures(featureUpdates, new UpdateFeaturesOptions());
+ }
+
/**
* Applies specified updates to finalized features. This operation is not
transactional so some
* updates may succeed while the rest may fail.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 93f6daaf8ec..1b274c95add 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -8845,8 +8845,7 @@ public class KafkaAdminClientTest {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
assertThrows(
IllegalArgumentException.class,
- () -> env.adminClient().updateFeatures(
- new HashMap<>(), new UpdateFeaturesOptions()));
+ () -> env.adminClient().updateFeatures(new HashMap<>()));
}
}
@@ -8857,8 +8856,7 @@ public class KafkaAdminClientTest {
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new
FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
- Utils.mkEntry("", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE))),
- new UpdateFeaturesOptions()));
+ Utils.mkEntry("", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE)))));
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 28202cb04d6..43b8a4f702e 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -4114,8 +4114,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
private def disableEligibleLeaderReplicas(admin: Admin): Unit = {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
admin.updateFeatures(
- util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new
FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
- new UpdateFeaturesOptions()).all().get()
+ util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new
FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get()
}
}
diff --git
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 11271d17c93..e9c500822e1 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -32,7 +32,7 @@ import
org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate,
UpdateFeaturesOptions}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -127,8 +127,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
private def disableEligibleLeaderReplicas(): Unit = {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
admin.updateFeatures(
- util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
- new UpdateFeaturesOptions()).all().get()
+ util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get()
}
}
diff --git
a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
index a97a4cb3bed..c889ba74d9f 100644
---
a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
@@ -32,7 +32,6 @@ import
org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -173,8 +172,7 @@ public class BootstrapControllersIntegrationTest {
private void testUpdateFeatures(ClusterInstance clusterInstance, boolean
usingBootstrapControllers) {
try (Admin admin = Admin.create(adminConfig(clusterInstance,
usingBootstrapControllers))) {
UpdateFeaturesResult result =
admin.updateFeatures(Map.of("foo.bar.feature",
- new FeatureUpdate((short) 1,
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions());
+ new FeatureUpdate((short) 1,
FeatureUpdate.UpgradeType.UPGRADE)));
ExecutionException exception =
assertThrows(ExecutionException.class,
() -> result.all().get(1, TimeUnit.MINUTES));
diff --git
a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
index 61863688f2e..093323ed5d0 100644
---
a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -97,8 +96,7 @@ public class EligibleLeaderReplicasIntegrationTest {
String testTopicName = String.format("%s-%s",
"testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test");
admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
- new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions()).all().get();
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);
@@ -164,8 +162,7 @@ public class EligibleLeaderReplicasIntegrationTest {
admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
- new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions()).all().get();
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);
@@ -234,8 +231,7 @@ public class EligibleLeaderReplicasIntegrationTest {
admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
- new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions()).all().get();
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);
@@ -293,8 +289,7 @@ public class EligibleLeaderReplicasIntegrationTest {
admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
- new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions()).all().get();
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index f89e5089156..4513c87da68 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.SupportedVersionRange;
import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaFuture;
@@ -1142,8 +1141,7 @@ public class KRaftClusterTest {
try (Admin admin = cluster.admin()) {
admin.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME,
- new
FeatureUpdate(MetadataVersion.latestTesting().featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions()
+ new
FeatureUpdate(MetadataVersion.latestTesting().featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE))
);
assertEquals(new SupportedVersionRange((short) 0, (short) 1),
admin.describeFeatures().featureMetadata().get()
.supportedFeatures().get(KRaftVersion.FEATURE_NAME));
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
index 8730d23066e..a620d63a697 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
-import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.errors.GroupNotEmptyException;
@@ -446,7 +445,7 @@ public class DeleteStreamsGroupTest {
try (Admin admin = cluster.createAdminClient()) {
Map<String, FeatureUpdate> updates = Utils.mkMap(
Utils.mkEntry("streams.version", new FeatureUpdate(version,
version == 0 ? FeatureUpdate.UpgradeType.SAFE_DOWNGRADE :
FeatureUpdate.UpgradeType.UPGRADE)));
- admin.updateFeatures(updates, new
UpdateFeaturesOptions()).all().get();
+ admin.updateFeatures(updates).all().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}