This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 9f8bd7394dd KAFKA-20111 Handle pre-4.1 brokers in kafka-configs.sh for
groups (#21385)
9f8bd7394dd is described below
commit 9f8bd7394dd82bbc54ff472a9ad49f22e9f0a517
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Feb 9 00:26:08 2026 +0000
KAFKA-20111 Handle pre-4.1 brokers in kafka-configs.sh for groups (#21385)
KIP-1142 introduced `Admin.listConfigResources()` for listing resources
which have configurations, but which may not actually exist as run-time
entities such as consumer groups which have not yet had any members.
Internally, this works by using v1 of the `LIST_CONFIG_RESOURCES` RPC
which is only supported in AK 4.1 and later. As a result, if you try to
describe the config for groups with older brokers, they do not support
v1 of the RPC and fail with `UnsupportedVersionException`. This PR
handles the exception in the config tool since the exception is harmless
and gives the same behaviour as we used to get with earlier versions.
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 20 +++--
.../org/apache/kafka/tools/ConfigCommandTest.java | 90 ++++++++++++++++++++++
2 files changed, 104 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index f004b9956c8..b8b2ae4fb37 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -23,7 +23,7 @@ import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions,
ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion,
ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidConfigurationException,
UnsupportedVersionException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException,
InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
@@ -367,9 +367,7 @@ object ConfigCommand extends Logging {
return
}
case GroupType =>
- if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId()
== name) &&
-
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP),
new ListConfigResourcesOptions).all.get
- .stream.noneMatch(_.name == name)) {
+ if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId ==
name) && listGroupConfigResources(adminClient).exists(resources =>
resources.stream.noneMatch(_.name == name))) {
System.out.println(s"The ${entityType.dropRight(1)} '$name'
doesn't exist and doesn't have dynamic config.")
return
}
@@ -388,8 +386,7 @@ object ConfigCommand extends Logging {
case ClientMetricsType =>
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS),
new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq
case GroupType =>
- adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
-
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP),
new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet
+ adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
listGroupConfigResources(adminClient).map(resources =>
resources.asScala.map(_.name).toSet).getOrElse(Set.empty)
case entityType => throw new IllegalArgumentException(s"Invalid entity
type: $entityType")
})
@@ -537,6 +534,17 @@ object ConfigCommand extends Logging {
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30,
TimeUnit.SECONDS).asScala
}
+ private def listGroupConfigResources(adminClient: Admin):
Option[java.util.Collection[ConfigResource]] = {
+ try {
+
Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP),
new ListConfigResourcesOptions).all.get)
+ } catch {
+ // (KIP-1142) 4.1+ admin client vs older broker: treat
UnsupportedVersionException and ClusterAuthorizationException as None
+ case e: ExecutionException if
e.getCause.isInstanceOf[UnsupportedVersionException] => None
+ case e: ExecutionException if
e.getCause.isInstanceOf[ClusterAuthorizationException] => None
+ case e: ExecutionException => throw e.getCause
+ }
+ }
+
class ConfigCommandOptions(args: Array[String]) extends
CommandDefaultOptions(args) {
val bootstrapServerOpt: OptionSpec[String] =
parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 13cfb281111..65dfea2117c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -33,10 +33,14 @@ import
org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeUserScramCredentialsOptions;
import org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
+import org.apache.kafka.clients.admin.ListConfigResourcesResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -1412,6 +1416,87 @@ public class ConfigCommandTest {
assertEquals("An entity name must be specified with --alter of
groups", exception.getMessage());
}
+ @Test
+ public void testDescribeGroupConfigOldBroker() {
+ ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
+ "--entity-type", "groups",
+ "--describe"));
+
+ KafkaFutureImpl<Collection<ConfigResource>> future = new
KafkaFutureImpl<>();
+ ListConfigResourcesResult listConfigResourcesResult =
mock(ListConfigResourcesResult.class);
+ when(listConfigResourcesResult.all()).thenReturn(future);
+
+ AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+ Node node = new Node(1, "localhost", 9092);
+ MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ ConfigResource.Type type =
configResourceTypes.iterator().next();
+ assertEquals(ConfigResource.Type.GROUP, type);
+ future.completeExceptionally(new
UnsupportedVersionException("The v0 ListConfigResources only supports
CLIENT_METRICS"));
+ listedConfigResources.set(true);
+ return listConfigResourcesResult;
+ }
+ };
+
+ ConfigCommand.describeConfig(mockAdminClient, describeOpts);
+ assertTrue(listedConfigResources.get());
+ }
+
+ @Test
+ public void testDescribeGroupConfigOldBrokerNotAuthorized() {
+ ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
+ "--entity-type", "groups",
+ "--describe"));
+
+ KafkaFutureImpl<Collection<ConfigResource>> future = new
KafkaFutureImpl<>();
+ ListConfigResourcesResult listConfigResourcesResult =
mock(ListConfigResourcesResult.class);
+ when(listConfigResourcesResult.all()).thenReturn(future);
+
+ AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+ Node node = new Node(1, "localhost", 9092);
+ MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ ConfigResource.Type type =
configResourceTypes.iterator().next();
+ assertEquals(ConfigResource.Type.GROUP, type);
+ future.completeExceptionally(new
ClusterAuthorizationException("Not authorized to the cluster"));
+ listedConfigResources.set(true);
+ return listConfigResourcesResult;
+ }
+ };
+
+ ConfigCommand.describeConfig(mockAdminClient, describeOpts);
+ assertTrue(listedConfigResources.get());
+ }
+
+ @Test
+ public void testDescribeGroupConfigOldBrokerUnexpectedException() {
+ ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
+ "--entity-type", "groups",
+ "--describe"));
+
+ KafkaFutureImpl<Collection<ConfigResource>> future = new
KafkaFutureImpl<>();
+ ListConfigResourcesResult listConfigResourcesResult =
mock(ListConfigResourcesResult.class);
+ when(listConfigResourcesResult.all()).thenReturn(future);
+
+ AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+ Node node = new Node(1, "localhost", 9092);
+ MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ ConfigResource.Type type =
configResourceTypes.iterator().next();
+ assertEquals(ConfigResource.Type.GROUP, type);
+ future.completeExceptionally(new
InvalidConfigurationException("That was unexpected"));
+ listedConfigResources.set(true);
+ return listConfigResourcesResult;
+ }
+ };
+
+ assertThrows(InvalidConfigurationException.class, () ->
ConfigCommand.describeConfig(mockAdminClient, describeOpts));
+ assertTrue(listedConfigResources.get());
+ }
+
public static String[] toArray(String... first) {
return first;
}
@@ -1461,6 +1546,11 @@ public class ConfigCommandTest {
public AlterClientQuotasResult
alterClientQuotas(Collection<ClientQuotaAlteration> entries,
AlterClientQuotasOptions options) {
return mock(AlterClientQuotasResult.class);
}
+
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ return mock(ListConfigResourcesResult.class);
+ }
}
private <T> Seq<T> seq(Collection<T> seq) {