This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 9d24eec731d KAFKA-20111 Handle pre-4.1 brokers in kafka-configs.sh for 
groups (#21385)
9d24eec731d is described below

commit 9d24eec731d9ffb62fddc3a7869c87d51196a7e0
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Feb 9 00:26:08 2026 +0000

    KAFKA-20111 Handle pre-4.1 brokers in kafka-configs.sh for groups (#21385)
    
    KIP-1142 introduced `Admin.listConfigResources()` for listing resources
    which have configurations, but which may not actually exist as run-time
    entities such as consumer groups which have not yet had any members.
    Internally, this works by using v1 of the `LIST_CONFIG_RESOURCES` RPC
    which is only supported in AK 4.1 and later. As a result, if you try to
    describe the config for groups with older brokers, they do not support
    v1 of the RPC and fail with `UnsupportedVersionException`. This PR
    handles the exception in the config tool since the exception is harmless
    and gives the same behaviour as we used to get with earlier versions.
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 20 +++--
 .../org/apache/kafka/tools/ConfigCommandTest.java  | 90 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index f004b9956c8..b8b2ae4fb37 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -23,7 +23,7 @@ import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions, 
ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, 
ScramMechanism => PublicScramMechanism}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidConfigurationException, 
UnsupportedVersionException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
InvalidConfigurationException, UnsupportedVersionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
@@ -367,9 +367,7 @@ object ConfigCommand extends Logging {
               return
             }
           case GroupType =>
-            if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() 
== name) &&
-              
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), 
new ListConfigResourcesOptions).all.get
-                .stream.noneMatch(_.name == name)) {
+            if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId == 
name) && listGroupConfigResources(adminClient).exists(resources => 
resources.stream.noneMatch(_.name == name))) {
               System.out.println(s"The ${entityType.dropRight(1)} '$name' 
doesn't exist and doesn't have dynamic config.")
               return
             }
@@ -388,8 +386,7 @@ object ConfigCommand extends Logging {
         case ClientMetricsType =>
           
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS),
 new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq
         case GroupType =>
-          adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
-            
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), 
new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet
+          adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ 
listGroupConfigResources(adminClient).map(resources => 
resources.asScala.map(_.name).toSet).getOrElse(Set.empty)
         case entityType => throw new IllegalArgumentException(s"Invalid entity 
type: $entityType")
       })
 
@@ -537,6 +534,17 @@ object ConfigCommand extends Logging {
     
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30,
 TimeUnit.SECONDS).asScala
   }
 
+  private def listGroupConfigResources(adminClient: Admin): 
Option[java.util.Collection[ConfigResource]] = {
+    try {
+      
Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP),
 new ListConfigResourcesOptions).all.get)
+    } catch {
+      // (KIP-1142) 4.1+ admin client vs older broker: treat 
UnsupportedVersionException and ClusterAuthorizationException as None
+      case e: ExecutionException if 
e.getCause.isInstanceOf[UnsupportedVersionException] => None
+      case e: ExecutionException if 
e.getCause.isInstanceOf[ClusterAuthorizationException] => None
+      case e: ExecutionException => throw e.getCause
+    }
+  }
+
 
   class ConfigCommandOptions(args: Array[String]) extends 
CommandDefaultOptions(args) {
     val bootstrapServerOpt: OptionSpec[String] = 
parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 91e61bf9542..8c801fc37d7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -33,10 +33,14 @@ import 
org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.DescribeConfigsResult;
 import org.apache.kafka.clients.admin.DescribeUserScramCredentialsOptions;
 import org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
+import org.apache.kafka.clients.admin.ListConfigResourcesResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -1421,6 +1425,87 @@ public class ConfigCommandTest {
         assertEquals("An entity name must be specified with --alter of 
groups", exception.getMessage());
     }
 
+    @Test
+    public void testDescribeGroupConfigOldBroker() {
+        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", 
"localhost:9092",
+            "--entity-type", "groups",
+            "--describe"));
+
+        KafkaFutureImpl<Collection<ConfigResource>> future = new 
KafkaFutureImpl<>();
+        ListConfigResourcesResult listConfigResourcesResult = 
mock(ListConfigResourcesResult.class);
+        when(listConfigResourcesResult.all()).thenReturn(future);
+
+        AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+        Node node = new Node(1, "localhost", 9092);
+        MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
+            @Override
+            public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+                ConfigResource.Type type = 
configResourceTypes.iterator().next();
+                assertEquals(ConfigResource.Type.GROUP, type);
+                future.completeExceptionally(new 
UnsupportedVersionException("The v0 ListConfigResources only supports 
CLIENT_METRICS"));
+                listedConfigResources.set(true);
+                return listConfigResourcesResult;
+            }
+        };
+
+        ConfigCommand.describeConfig(mockAdminClient, describeOpts);
+        assertTrue(listedConfigResources.get());
+    }
+
+    @Test
+    public void testDescribeGroupConfigOldBrokerNotAuthorized() {
+        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", 
"localhost:9092",
+            "--entity-type", "groups",
+            "--describe"));
+
+        KafkaFutureImpl<Collection<ConfigResource>> future = new 
KafkaFutureImpl<>();
+        ListConfigResourcesResult listConfigResourcesResult = 
mock(ListConfigResourcesResult.class);
+        when(listConfigResourcesResult.all()).thenReturn(future);
+
+        AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+        Node node = new Node(1, "localhost", 9092);
+        MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
+            @Override
+            public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+                ConfigResource.Type type = 
configResourceTypes.iterator().next();
+                assertEquals(ConfigResource.Type.GROUP, type);
+                future.completeExceptionally(new 
ClusterAuthorizationException("Not authorized to the cluster"));
+                listedConfigResources.set(true);
+                return listConfigResourcesResult;
+            }
+        };
+
+        ConfigCommand.describeConfig(mockAdminClient, describeOpts);
+        assertTrue(listedConfigResources.get());
+    }
+
+    @Test
+    public void testDescribeGroupConfigOldBrokerUnexpectedException() {
+        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", 
"localhost:9092",
+            "--entity-type", "groups",
+            "--describe"));
+
+        KafkaFutureImpl<Collection<ConfigResource>> future = new 
KafkaFutureImpl<>();
+        ListConfigResourcesResult listConfigResourcesResult = 
mock(ListConfigResourcesResult.class);
+        when(listConfigResourcesResult.all()).thenReturn(future);
+
+        AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+        Node node = new Node(1, "localhost", 9092);
+        MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
+            @Override
+            public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+                ConfigResource.Type type = 
configResourceTypes.iterator().next();
+                assertEquals(ConfigResource.Type.GROUP, type);
+                future.completeExceptionally(new 
InvalidConfigurationException("That was unexpected"));
+                listedConfigResources.set(true);
+                return listConfigResourcesResult;
+            }
+        };
+
+        assertThrows(InvalidConfigurationException.class, () -> 
ConfigCommand.describeConfig(mockAdminClient, describeOpts));
+        assertTrue(listedConfigResources.get());
+    }
+
     public static String[] toArray(String... first) {
         return first;
     }
@@ -1470,6 +1555,11 @@ public class ConfigCommandTest {
         public AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options) {
             return mock(AlterClientQuotasResult.class);
         }
+
+        @Override
+        public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+            return mock(ListConfigResourcesResult.class);
+        }
     }
 
     private <T> Seq<T> seq(Collection<T> seq) {

Reply via email to