This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7d986e48c2 KAFKA-17550: DescribeGroups v6 exploitation (#17706)
e7d986e48c2 is described below

commit e7d986e48c2e939e834ecb44e61f41153a52eead
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Dec 6 07:12:24 2024 +0000

    KAFKA-17550: DescribeGroups v6 exploitation (#17706)
    
    This PR introduces the DescribeGroups v6 API as part of KIP-1043. This adds 
an error message for the described groups so that it is possible to get some 
context on the error. It also changes the behaviour for when the group ID 
cannot be found but returning error code GROUP_ID_NOT_FOUND rather than NONE.
    
    Reviewers: David Jacot <[email protected]>
---
 .../internals/DescribeConsumerGroupsHandler.java   |  17 ++-
 .../internals/DescribeShareGroupsHandler.java      |  11 +-
 .../common/requests/DescribeGroupsResponse.java    |   8 ++
 .../common/message/DescribeGroupsRequest.json      |   6 +-
 .../common/message/DescribeGroupsResponse.json     |   6 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 160 ++++++++++++++++++---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  11 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  13 +-
 .../group/GroupCoordinatorAdapter.scala            |   3 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   3 +-
 .../api/GroupCoordinatorIntegrationTest.scala      |  17 ++-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  61 +++-----
 .../group/GroupCoordinatorAdapterTest.scala        |   8 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  37 +++--
 .../kafka/server/DeleteGroupsRequestTest.scala     |   2 +
 .../kafka/server/DescribeGroupsRequestTest.scala   |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  10 +-
 docs/upgrade.html                                  |  16 ++-
 .../coordinator/group/GroupCoordinatorShard.java   |   2 +-
 .../coordinator/group/GroupMetadataManager.java    |  27 +++-
 .../group/GroupMetadataManagerTest.java            |  15 +-
 .../group/GroupMetadataManagerTestContext.java     |  53 ++++++-
 .../integration/utils/IntegrationTestUtils.java    |   5 +-
 .../org/apache/kafka/tools/StreamsResetter.java    |  24 ++--
 .../tools/consumer/group/ConsumerGroupCommand.java |  63 +++++---
 .../group/ConsumerGroupCommandOptions.java         |   2 +-
 .../consumer/group/AuthorizerIntegrationTest.java  |  14 +-
 .../consumer/group/DeleteConsumerGroupsTest.java   |   4 +-
 .../consumer/group/DescribeConsumerGroupTest.java  |  42 +++---
 .../group/ResetConsumerGroupOffsetTest.java        |   4 -
 30 files changed, 475 insertions(+), 173 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
index e66d45e2635..1d911e2f0c7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
@@ -64,6 +64,7 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
     private final Logger log;
     private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
     private final Set<String> useClassicGroupApi;
+    private final Map<String, String> groupIdNotFoundErrorMessages;
 
     public DescribeConsumerGroupsHandler(
         boolean includeAuthorizedOperations,
@@ -73,6 +74,7 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
         this.log = logContext.logger(DescribeConsumerGroupsHandler.class);
         this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
         this.useClassicGroupApi = new HashSet<>();
+        this.groupIdNotFoundErrorMessages = new HashMap<>();
     }
 
     private static Set<CoordinatorKey> buildKeySet(Collection<String> 
groupIds) {
@@ -255,7 +257,7 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
                 handleError(
                     groupIdKey,
                     error,
-                    null,
+                    describedGroup.errorMessage(),
                     failed,
                     groupsToUnmap,
                     false
@@ -354,11 +356,18 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
             case GROUP_ID_NOT_FOUND:
                 if (isConsumerGroupResponse) {
                     log.debug("`{}` request for group id {} failed because the 
group is not " +
-                        "a new consumer group. Will retry with 
`DescribeGroups` API.", apiName, groupId.idValue);
+                        "a new consumer group. Will retry with 
`DescribeGroups` API. {}",
+                        apiName, groupId.idValue, errorMsg != null ? errorMsg 
: "");
                     useClassicGroupApi.add(groupId.idValue);
+
+                    // The error message from the ConsumerGroupDescribe API is 
more informative to the user
+                    // than the error message from the classic group API. 
Capture it and use it if we get the
+                    // same error code for the classic group API also.
+                    groupIdNotFoundErrorMessages.put(groupId.idValue, 
errorMsg);
                 } else {
-                    log.error("`{}` request for group id {} failed because the 
group does not exist.", apiName, groupId.idValue);
-                    failed.put(groupId, error.exception(errorMsg));
+                    log.debug("`{}` request for group id {} failed because the 
group does not exist. {}",
+                        apiName, groupId.idValue, errorMsg != null ? errorMsg 
: "");
+                    failed.put(groupId, 
error.exception(groupIdNotFoundErrorMessages.getOrDefault(groupId.idValue, 
errorMsg)));
                 }
                 break;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
index f1c9e5d45ca..a763a4255e6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -177,17 +176,9 @@ public class DescribeShareGroupsHandler extends 
AdminApiHandler.Batched<Coordina
                 break;
 
             case GROUP_ID_NOT_FOUND:
-                // In order to maintain compatibility with 
describeConsumerGroups, an unknown group ID is
-                // reported as a DEAD share group, and the admin client 
operation did not fail
                 log.debug("`DescribeShareGroups` request for group id {} 
failed because the group does not exist. {}",
                     groupId.idValue, errorMsg != null ? errorMsg : "");
-                final ShareGroupDescription shareGroupDescription =
-                    new ShareGroupDescription(groupId.idValue,
-                        Collections.emptySet(),
-                        GroupState.DEAD,
-                        coordinator,
-                        
validAclOperations(describedGroup.authorizedOperations()));
-                completed.put(groupId, shareGroupDescription);
+                failed.put(groupId, error.exception(errorMsg));
                 break;
 
             default:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index bd341e19f9a..b0248055339 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -110,6 +110,14 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
             DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), 
AUTHORIZED_OPERATIONS_OMITTED);
     }
 
+    public static DescribedGroup groupError(String groupId, Errors error, 
String errorMessage) {
+        return new DescribedGroup()
+            .setGroupId(groupId)
+            .setGroupState(DescribeGroupsResponse.UNKNOWN_STATE)
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage);
+    }
+
     @Override
     public DescribeGroupsResponseData data() {
         return data;
diff --git 
a/clients/src/main/resources/common/message/DescribeGroupsRequest.json 
b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
index 6b10b0637a2..8dabf71bd52 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
@@ -25,11 +25,13 @@
   // Starting in version 4, the response will include group.instance.id info 
for members.
   //
   // Version 5 is the first flexible version.
-  "validVersions": "0-5",
+  //
+  // Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not 
found (KIP-1043).
+  "validVersions": "0-6",
   "flexibleVersions": "5+",
   "fields": [
     { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": 
"groupId",
-      "about": "The names of the groups to describe" },
+      "about": "The names of the groups to describe." },
     { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
       "about": "Whether to include authorized operations." }
   ]
diff --git 
a/clients/src/main/resources/common/message/DescribeGroupsResponse.json 
b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index 99ef4203e06..aa51b994cb1 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -26,7 +26,9 @@
   // Starting in version 4, the response will optionally include 
group.instance.id info for members.
   //
   // Version 5 is the first flexible version.
-  "validVersions": "0-5",
+  //
+  // Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not 
found (KIP-1043).
+  "validVersions": "0-6",
   "flexibleVersions": "5+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
@@ -35,6 +37,8 @@
       "about": "Each described group.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The describe error, or 0 if there was no error." },
+      { "name": "ErrorMessage", "type": "string", "versions": "6+", 
"nullableVersions": "6+", "default": "null",
+        "about": "The describe error message, or null if there was no error." 
},
       { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
         "about": "The group ID string." },
       { "name": "GroupState", "type": "string", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ca70af6d114..b0b48e33c67 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3799,7 +3799,13 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(asList(
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, 
env.cluster().controller()),
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, 
"group-connect-0", env.cluster().controller())
+                    ))
+            ));
 
             // The first request sent will be a ConsumerGroupDescribe request. 
Let's
             // fail it in order to fail back to using the classic version.
@@ -3819,8 +3825,8 @@ public class KafkaAdminClientTest {
             byte[] memberAssignmentBytes = new 
byte[memberAssignment.remaining()];
             memberAssignment.get(memberAssignmentBytes);
 
-            DescribeGroupsResponseData group0Data = new 
DescribeGroupsResponseData();
-            group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
+            DescribeGroupsResponseData groupData = new 
DescribeGroupsResponseData();
+            groupData.groups().add(DescribeGroupsResponse.groupMetadata(
                     GROUP_ID,
                     Errors.NONE,
                     "",
@@ -3831,9 +3837,7 @@ public class KafkaAdminClientTest {
                             DescribeGroupsResponse.groupMember("1", null, 
"clientId1", "clientHost", memberAssignmentBytes, null)
                     ),
                     Collections.emptySet()));
-
-            DescribeGroupsResponseData groupConnectData = new 
DescribeGroupsResponseData();
-            group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
+            groupData.groups().add(DescribeGroupsResponse.groupMetadata(
                     "group-connect-0",
                     Errors.NONE,
                     "",
@@ -3845,8 +3849,7 @@ public class KafkaAdminClientTest {
                     ),
                     Collections.emptySet()));
 
-            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(group0Data));
-            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(groupConnectData));
+            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(groupData));
 
             Collection<String> groups = new HashSet<>();
             groups.add(GROUP_ID);
@@ -3854,6 +3857,72 @@ public class KafkaAdminClientTest {
             final DescribeConsumerGroupsResult result = 
env.adminClient().describeConsumerGroups(groups);
             assertEquals(2, result.describedGroups().size());
             assertEquals(groups, result.describedGroups().keySet());
+            KafkaFuture<Map<String, ConsumerGroupDescription>> allFuture = 
result.all();
+            // This throws because the second group is a classic connect 
group, not a consumer group.
+            assertThrows(ExecutionException.class, allFuture::get);
+            assertTrue(allFuture.isCompletedExceptionally());
+        }
+    }
+
+    @Test
+    public void testDescribeConsumerGroupsGroupIdNotFound() {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(asList(
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, 
env.cluster().controller()),
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, 
"group-connect-0", env.cluster().controller())
+                    ))
+            ));
+
+            // The first request sent will be a ConsumerGroupDescribe request. 
Let's
+            // fail it in order to fail back to using the classic version.
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ConsumerGroupDescribeRequest);
+
+            TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 
0);
+            TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 
1);
+            TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 
2);
+
+            final List<TopicPartition> topicPartitions = new ArrayList<>();
+            topicPartitions.add(0, myTopicPartition0);
+            topicPartitions.add(1, myTopicPartition1);
+            topicPartitions.add(2, myTopicPartition2);
+
+            final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(topicPartitions));
+            byte[] memberAssignmentBytes = new 
byte[memberAssignment.remaining()];
+            memberAssignment.get(memberAssignmentBytes);
+
+            DescribeGroupsResponseData groupData = new 
DescribeGroupsResponseData();
+            groupData.groups().add(DescribeGroupsResponse.groupMetadata(
+                GROUP_ID,
+                Errors.NONE,
+                "",
+                ConsumerProtocol.PROTOCOL_TYPE,
+                "",
+                asList(
+                    DescribeGroupsResponse.groupMember("0", null, "clientId0", 
"clientHost", memberAssignmentBytes, null),
+                    DescribeGroupsResponse.groupMember("1", null, "clientId1", 
"clientHost", memberAssignmentBytes, null)
+                ),
+                Collections.emptySet()));
+            groupData.groups().add(DescribeGroupsResponse.groupError(
+                "group-connect-0",
+                Errors.GROUP_ID_NOT_FOUND,
+                "Group group-connect-0 is not a classic group."));
+
+            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(groupData));
+
+            Collection<String> groups = new HashSet<>();
+            groups.add(GROUP_ID);
+            groups.add("group-connect-0");
+            final DescribeConsumerGroupsResult result = 
env.adminClient().describeConsumerGroups(groups);
+            assertEquals(2, result.describedGroups().size());
+            assertEquals(groups, result.describedGroups().keySet());
+            KafkaFuture<Map<String, ConsumerGroupDescription>> allFuture = 
result.all();
+            assertThrows(ExecutionException.class, allFuture::get);
+            assertTrue(result.all().isCompletedExceptionally());
         }
     }
 
@@ -4996,6 +5065,59 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeShareGroupsGroupIdNotFound() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(asList(
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, 
env.cluster().controller()),
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-1", 
env.cluster().controller())
+                    ))
+            ));
+
+            ShareGroupDescribeResponseData.TopicPartitions topicPartitions = 
new ShareGroupDescribeResponseData.TopicPartitions()
+                .setTopicName("my_topic")
+                .setPartitions(asList(0, 1, 2));
+            final ShareGroupDescribeResponseData.Assignment memberAssignment = 
new ShareGroupDescribeResponseData.Assignment()
+                .setTopicPartitions(asList(topicPartitions));
+            ShareGroupDescribeResponseData groupData = new 
ShareGroupDescribeResponseData();
+            groupData.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setGroupState(GroupState.STABLE.toString())
+                .setMembers(asList(
+                    new ShareGroupDescribeResponseData.Member()
+                        .setMemberId("0")
+                        .setClientId("clientId0")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment),
+                    new ShareGroupDescribeResponseData.Member()
+                        .setMemberId("1")
+                        .setClientId("clientId1")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment))));
+            groupData.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group-1")
+                .setGroupState(GroupState.DEAD.toString())
+                .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                .setErrorMessage("Group group-1 not found."));
+
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(groupData));
+
+            Collection<String> groups = new HashSet<>();
+            groups.add(GROUP_ID);
+            groups.add("group-1");
+            final DescribeShareGroupsResult result = 
env.adminClient().describeShareGroups(groups);
+            assertEquals(2, result.describedGroups().size());
+            assertEquals(groups, result.describedGroups().keySet());
+            KafkaFuture<Map<String, ShareGroupDescription>> allFuture = 
result.all();
+            assertThrows(ExecutionException.class, allFuture::get);
+            assertTrue(result.all().isCompletedExceptionally());
+        }
+    }
+
     @Test
     public void testDescribeShareGroupsWithAuthorizedOperationsOmitted() 
throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -5024,15 +5146,21 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(asList(
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, 
env.cluster().controller()),
+                        
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-1", 
env.cluster().controller())
+                    ))
+            ));
 
             ShareGroupDescribeResponseData.TopicPartitions topicPartitions = 
new ShareGroupDescribeResponseData.TopicPartitions()
                 .setTopicName("my_topic")
                 .setPartitions(asList(0, 1, 2));
             final ShareGroupDescribeResponseData.Assignment memberAssignment = 
new ShareGroupDescribeResponseData.Assignment()
                 .setTopicPartitions(asList(topicPartitions));
-            ShareGroupDescribeResponseData group0Data = new 
ShareGroupDescribeResponseData();
-            group0Data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+            ShareGroupDescribeResponseData groupData = new 
ShareGroupDescribeResponseData();
+            groupData.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupId(GROUP_ID)
                 .setGroupState(GroupState.STABLE.toString())
                 .setMembers(asList(
@@ -5046,9 +5174,7 @@ public class KafkaAdminClientTest {
                         .setClientId("clientId1")
                         .setClientHost("clientHost")
                         .setAssignment(memberAssignment))));
-
-            ShareGroupDescribeResponseData group1Data = new 
ShareGroupDescribeResponseData();
-            group1Data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+            groupData.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupId("group-1")
                 .setGroupState(GroupState.STABLE.toString())
                 .setMembers(asList(
@@ -5063,8 +5189,7 @@ public class KafkaAdminClientTest {
                         .setClientHost("clientHost")
                         .setAssignment(memberAssignment))));
 
-            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(group0Data));
-            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(group1Data));
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(groupData));
 
             Collection<String> groups = new HashSet<>();
             groups.add(GROUP_ID);
@@ -5072,6 +5197,9 @@ public class KafkaAdminClientTest {
             final DescribeShareGroupsResult result = 
env.adminClient().describeShareGroups(groups);
             assertEquals(2, result.describedGroups().size());
             assertEquals(groups, result.describedGroups().keySet());
+            KafkaFuture<Map<String, ShareGroupDescription>> allFuture = 
result.all();
+            assertDoesNotThrow(() -> allFuture.get());
+            assertFalse(allFuture.isCompletedExceptionally());
         }
     }
 
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index bc49c3a09b7..6ce91baf123 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
@@ -301,6 +302,7 @@ public class MirrorCheckpointTask extends SourceTask {
                 // sync offset to the target cluster only if the state of 
current consumer group is:
                 // (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
                 // (2) dead: the new consumer that is recently created at 
source and never existed at target
+                //           This case will be reported as a 
GroupIdNotFoundException
                 if (consumerGroupState == GroupState.EMPTY) {
                     idleConsumerGroupsOffset.put(
                             group,
@@ -311,8 +313,13 @@ public class MirrorCheckpointTask extends SourceTask {
                     );
                 }
                 // new consumer upstream has state "DEAD" and will be 
identified during the offset sync-up
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+            } catch (InterruptedException ie) {
+                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, ie);
+            } catch (ExecutionException ee) {
+                // check for non-existent new consumer upstream which will be 
identified during the offset sync-up
+                if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
+                    log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, ee);
+                }
             }
         }
     }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 8ca956daedf..47684bc753b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1138,15 +1138,20 @@ private[group] class GroupCoordinator(
     }
   }
 
-  def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+  def handleDescribeGroup(groupId: String, apiVersion: Short): (Errors, 
Option[String], GroupSummary) = {
     validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match {
-      case Some(error) => (error, GroupCoordinator.EmptyGroup)
+      case Some(error) => (error, None, GroupCoordinator.EmptyGroup)
       case None =>
         groupManager.getGroup(groupId) match {
-          case None => (Errors.NONE, GroupCoordinator.DeadGroup)
+          case None =>
+            if (apiVersion >= 6) {
+              (Errors.GROUP_ID_NOT_FOUND, Some(s"Group $groupId not found."), 
GroupCoordinator.DeadGroup)
+            } else {
+              (Errors.NONE, None, GroupCoordinator.DeadGroup)
+            }
           case Some(group) =>
             group.inLock {
-              (Errors.NONE, group.summary)
+              (Errors.NONE, None, group.summary)
             }
         }
     }
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 970d283953e..d8d47c46116 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -249,10 +249,11 @@ private[group] class GroupCoordinatorAdapter(
   ): CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]] = 
{
 
     def describeGroup(groupId: String): 
DescribeGroupsResponseData.DescribedGroup = {
-      val (error, summary) = coordinator.handleDescribeGroup(groupId)
+      val (error, errorMessage, summary) = 
coordinator.handleDescribeGroup(groupId, context.apiVersion())
 
       new DescribeGroupsResponseData.DescribedGroup()
         .setErrorCode(error.code)
+        .setErrorMessage(errorMessage.orNull)
         .setGroupId(groupId)
         .setGroupState(summary.state)
         .setProtocolType(summary.protocolType)
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 016cfcad122..5ae424f2137 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1610,7 +1610,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     createTopicWithBrokerPrincipal(topic)
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, DESCRIBE, ALLOW)), groupResource)
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
-    
createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get()
+    val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
+    JTestUtils.assertFutureThrows(result.describedGroups().get(group), 
classOf[GroupIdNotFoundException])
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 04146e12685..84214a79ed9 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -19,6 +19,7 @@ import org.apache.kafka.common.test.api.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
 import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, 
OffsetAndMetadata}
+import org.apache.kafka.common.errors.GroupIdNotFoundException
 import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicPartition}
 import org.junit.jupiter.api.Assertions._
 
@@ -221,7 +222,7 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
         .asScala
         .toMap
 
-      assertDescribedGroup(groups, "grp3", GroupType.CLASSIC, 
ConsumerGroupState.DEAD)
+      assertDescribedDeadGroup(groups, "grp3")
     }
   }
 
@@ -328,4 +329,18 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
     assertEquals(state, group.state)
     assertEquals(Collections.emptyList, group.members)
   }
+
+  private def assertDescribedDeadGroup(
+    groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
+    groupId: String
+  ): Unit = {
+    try {
+      groups(groupId).get(10, TimeUnit.SECONDS)
+      fail(s"Group $groupId should not be found")
+    } catch {
+      case e: java.util.concurrent.ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[GroupIdNotFoundException])
+        assertEquals(s"Group $groupId not found.", e.getCause.getMessage)
+    }
+  }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b4d06d9f993..c53dfef850e 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1936,18 +1936,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
           assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
 
-          // Test that the fake group is listed as dead.
+          // Test that the fake group throws GroupIdNotFoundException
           
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
-          val fakeGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
+          
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
 classOf[GroupIdNotFoundException],
+            s"Group $fakeGroupId not found.")
 
-          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
-          assertEquals(0, fakeGroupDescription.members().size())
-          assertEquals("", fakeGroupDescription.partitionAssignor())
-          assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
-          assertEquals(expectedOperations, 
fakeGroupDescription.authorizedOperations())
-
-          // Test that all() returns 2 results
-          assertEquals(2, describeWithFakeGroupResult.all().get().size())
+          // Test that all() also throws GroupIdNotFoundException
+          assertFutureThrows(describeWithFakeGroupResult.all(), 
classOf[GroupIdNotFoundException],
+            s"Group $fakeGroupId not found.")
 
           val testTopicPart0 = new TopicPartition(testTopicName, 0)
 
@@ -2209,18 +2205,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
           assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
 
-          // Test that the fake group is listed as dead.
+          // Test that the fake group throws GroupIdNotFoundException
           
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
-          val fakeGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
-
-          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
-          assertEquals(0, fakeGroupDescription.members().size())
-          assertEquals("", fakeGroupDescription.partitionAssignor())
-          assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
-          assertEquals(expectedOperations, 
fakeGroupDescription.authorizedOperations())
+          
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
+            classOf[GroupIdNotFoundException], s"Group $fakeGroupId not 
found.")
 
-          // Test that all() returns 2 results
-          assertEquals(2, describeWithFakeGroupResult.all().get().size())
+          // Test that all() also throws GroupIdNotFoundException
+          assertFutureThrows(describeWithFakeGroupResult.all(),
+            classOf[GroupIdNotFoundException], s"Group $fakeGroupId not 
found.")
 
           val testTopicPart0 = new TopicPartition(testTopicName, 0)
 
@@ -2642,17 +2634,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
         assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
 
-        // Test that the fake group is listed as dead.
+        // Test that the fake group throws GroupIdNotFoundException
         
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
-        val fakeGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
-
-        assertEquals(fakeGroupId, fakeGroupDescription.groupId())
-        assertEquals(0, fakeGroupDescription.members().size())
-        assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
-        assertNull(fakeGroupDescription.authorizedOperations())
+        
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
+          classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.")
 
-        // Test that all() returns 2 results
-        assertEquals(2, describeWithFakeGroupResult.all().get().size())
+        // Test that all() also throws GroupIdNotFoundException
+        assertFutureThrows(describeWithFakeGroupResult.all(),
+          classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.")
 
         val describeTestGroupResult = 
client.describeShareGroups(Collections.singleton(testGroupId),
           new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
@@ -2664,18 +2653,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         assertEquals(testGroupId, testGroupDescription.groupId)
         assertEquals(consumerSet.size, testGroupDescription.members().size())
 
-        // Describing a share group using describeConsumerGroups reports it as 
a DEAD consumer group
-        // in the same way as a non-existent group
+        // Describing a share group using describeConsumerGroups reports it as 
a non-existent group
+        // but the error message is different
         val describeConsumerGroupResult = 
client.describeConsumerGroups(Collections.singleton(testGroupId),
           new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
-        assertEquals(1, describeConsumerGroupResult.all().get().size())
-
-        val deadConsumerGroupDescription = 
describeConsumerGroupResult.describedGroups().get(testGroupId).get()
-        assertEquals(testGroupId, deadConsumerGroupDescription.groupId())
-        assertEquals(0, deadConsumerGroupDescription.members().size())
-        assertEquals("", deadConsumerGroupDescription.partitionAssignor())
-        assertEquals(ConsumerGroupState.DEAD, 
deadConsumerGroupDescription.state())
-        assertEquals(expectedOperations, 
deadConsumerGroupDescription.authorizedOperations())
+        assertFutureThrows(describeConsumerGroupResult.all(),
+          classOf[GroupIdNotFoundException], s"Group $testGroupId is not a 
consumer group.")
       } finally {
         consumerThreads.foreach {
           case consumerThread =>
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 7a9de453740..8d631fe2b80 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -414,12 +414,12 @@ class GroupCoordinatorAdapterTest {
       ))
     )
 
-    when(groupCoordinator.handleDescribeGroup(groupId1)).thenReturn {
-      (Errors.NONE, groupSummary1)
+    when(groupCoordinator.handleDescribeGroup(groupId1, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)).thenReturn {
+      (Errors.NONE, None, groupSummary1)
     }
 
-    when(groupCoordinator.handleDescribeGroup(groupId2)).thenReturn {
-      (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
+    when(groupCoordinator.handleDescribeGroup(groupId2, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)).thenReturn {
+      (Errors.NOT_COORDINATOR, None, GroupCoordinator.EmptyGroup)
     }
 
     val ctx = makeContext(ApiKeys.DESCRIBE_GROUPS, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index df0e3483f5a..433047275e2 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -175,7 +175,7 @@ class GroupCoordinatorTest {
     assertEquals(Some(Errors.NONE), heartbeatError)
 
     // DescribeGroups
-    val (describeGroupError, _) = 
groupCoordinator.handleDescribeGroup(otherGroupId)
+    val (describeGroupError, _, _) = 
groupCoordinator.handleDescribeGroup(otherGroupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError)
 
     // ListGroups
@@ -187,15 +187,16 @@ class GroupCoordinatorTest {
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), 
deleteGroupsErrors.get(otherGroupId))
 
     // Check that non-loading groups are still accessible
-    assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(groupId)._1)
+    assertEquals(Errors.GROUP_ID_NOT_FOUND, 
groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)._1)
 
     // After loading, we should be able to access the group
     val otherGroupMetadataTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId)
     
when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None)
+
     // Call removeGroupsAndOffsets so that partition removed from 
loadingPartitions
     
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
 OptionalInt.of(1), group => {})
     
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
 1, group => {}, 0L)
-    assertEquals(Errors.NONE, 
groupCoordinator.handleDescribeGroup(otherGroupId)._1)
+    assertEquals(Errors.GROUP_ID_NOT_FOUND, 
groupCoordinator.handleDescribeGroup(otherGroupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)._1)
   }
 
   @Test
@@ -2609,8 +2610,9 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, fetchError)
     assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset))
 
-    val (describeError, summary) = 
groupCoordinator.handleDescribeGroup(groupId)
+    var (describeError, describeErrorMessage, summary) = 
groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
     assertEquals(Errors.NONE, describeError)
+    assertTrue(describeErrorMessage.isEmpty)
     assertEquals(Empty.toString, summary.state)
 
     val groupTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
@@ -3405,15 +3407,21 @@ class GroupCoordinatorTest {
 
   @Test
   def testDescribeGroupWrongCoordinator(): Unit = {
-    val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+    val (error, _, _) = groupCoordinator.handleDescribeGroup(otherGroupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
     assertEquals(Errors.NOT_COORDINATOR, error)
   }
 
   @Test
   def testDescribeGroupInactiveGroup(): Unit = {
-    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    val (error, errorMessage, summary) = 
groupCoordinator.handleDescribeGroup(groupId, 5)
     assertEquals(Errors.NONE, error)
+    assertTrue(errorMessage.isEmpty)
     assertEquals(GroupCoordinator.DeadGroup, summary)
+
+    val (errorV6, errorMessageV6, summaryV6) = 
groupCoordinator.handleDescribeGroup(groupId, 6)
+    assertEquals(Errors.GROUP_ID_NOT_FOUND, errorV6)
+    assertEquals(s"Group $groupId not found.", errorMessageV6.get)
+    assertEquals(GroupCoordinator.DeadGroup, summaryV6)
   }
 
   @Test
@@ -3427,8 +3435,9 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, generationId, 
assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    val (error, errorMessage, summary) = 
groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
     assertEquals(Errors.NONE, error)
+    assertTrue(errorMessage.isEmpty)
     assertEquals(protocolType, summary.protocolType)
     assertEquals("range", summary.protocol)
     assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
@@ -3445,8 +3454,9 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, generationId, 
assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    val (error, errorMessage, summary) = 
groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
     assertEquals(Errors.NONE, error)
+    assertTrue(errorMessage.isEmpty)
     assertEquals(protocolType, summary.protocolType)
     assertEquals("range", summary.protocol)
     assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
@@ -3460,8 +3470,9 @@ class GroupCoordinatorTest {
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
-    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    val (error, errorMessage, summary) = 
groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
     assertEquals(Errors.NONE, error)
+    assertTrue(errorMessage.isEmpty)
     assertEquals(protocolType, summary.protocolType)
     assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
     assertEquals(CompletingRebalance.toString, summary.state)
@@ -3528,9 +3539,9 @@ class GroupCoordinatorTest {
     val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
joinGroupResult.generationId, Map(tip -> offset))
     assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
-    assertEquals(Stable.toString, describeGroupResult._2.state)
-    assertEquals(assignedMemberId, 
describeGroupResult._2.members.head.memberId)
+    val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)
+    assertEquals(Stable.toString, describeGroupResult._3.state)
+    assertEquals(assignedMemberId, 
describeGroupResult._3.members.head.memberId)
 
     val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
     verifyLeaveGroupResult(leaveGroupResults)
@@ -3545,7 +3556,7 @@ class GroupCoordinatorTest {
     val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && 
result.get(groupId).contains(Errors.NONE))
 
-    assertEquals(Dead.toString, 
groupCoordinator.handleDescribeGroup(groupId)._2.state)
+    assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId, 
ApiKeys.DESCRIBE_GROUPS.latestVersion)._3.state)
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 2485f09409d..0fab872363b 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -123,6 +123,8 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
           List(new DescribedGroup()
             .setGroupId("grp")
             .setGroupState(ClassicGroupState.DEAD.toString)
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code)
+            .setErrorMessage("Group grp not found.")
           ),
           describeGroups(List("grp"))
         )
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 4a822048e1c..de8044ce2c1 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.ClusterInstance
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import org.apache.kafka.common.test.api.ClusterTestExtensions
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -106,6 +106,8 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinat
           new DescribedGroup()
             .setGroupId("grp-unknown")
             .setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD 
group when the group does not exist.
+            .setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() 
else Errors.NONE.code())
+            .setErrorMessage(if (version >= 6) "Group grp-unknown not found." 
else null)
         ),
         describeGroups(
           groupIds = List("grp-1", "grp-2", "grp-unknown"),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5771a17d5ea..f856caf9cfe 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3719,7 +3719,8 @@ class KafkaApisTest extends Logging {
     val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List(
       "group-1",
       "group-2",
-      "group-3"
+      "group-3",
+      "group-4"
     ).asJava)
 
     val requestChannelRequest = buildRequest(new 
DescribeGroupsRequest.Builder(describeGroupsRequest).build())
@@ -3746,7 +3747,12 @@ class KafkaApisTest extends Logging {
         .setErrorCode(Errors.NOT_COORDINATOR.code),
       new DescribeGroupsResponseData.DescribedGroup()
         .setGroupId("group-3")
-        .setErrorCode(Errors.REQUEST_TIMED_OUT.code)
+        .setErrorCode(Errors.REQUEST_TIMED_OUT.code),
+      new DescribeGroupsResponseData.DescribedGroup()
+        .setGroupId("group-4")
+        .setGroupState("Dead")
+        .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code)
+        .setErrorMessage("Group group-4 is not a classic group.")
     ).asJava
 
     future.complete(groupResults)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 8e2aa2915f4..84b2b36ebc1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -161,12 +161,19 @@
                         </li>
                     </ul>
                 </li>
-                <li><b>Admin</b>
+                <li><b>Admin client</b>
                     <ul>
                         <li>
-                            The <code>alterConfigs</code> method was removed 
from the <code>org.apache.kafka.clients.admin.Admin</code>
+                            The <code>alterConfigs</code> method was removed 
from the <code>org.apache.kafka.clients.admin.Admin</code>.
                             Please use <code>incrementalAlterConfigs</code> 
instead.
                         </li>
+                        <li>The 
<code>org.apache.kafka.common.ConsumerGroupState</code> enumeration and related 
methods have been deprecated. Please use <code>GroupState</code> instead
+                            which applies to all types of group.
+                        </li>
+                        <li>The <code>Admin.describeConsumerGroups</code> 
method used to return a <code>ConsumerGroupDescription</code> in state
+                            <code>DEAD</code> if the group ID was not found. 
In Apache Kafka 4.0, the <code>GroupIdNotFoundException</code>
+                            is thrown instead as part of the support for new 
types of group.
+                        </li>
                     </ul>
                 </li>
             </ul>
@@ -185,10 +192,11 @@
                     See <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223";>KIP-750</a>
 for more details
                 </li>
                 <li>
-                    KafkaLog4jAppender has been remove, users should migrate 
to the log4j2 appender
+                    KafkaLog4jAppender has been removed, users should migrate 
to the log4j2 appender
                     See <a 
href="https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender";>KafkaAppender</a>
 for more details
                 </li>
-                <li>The <code>--delete-config</code> option in the 
<code>kafka-topics</code> command line tool has been deprecated.
+                <li>
+                    The <code>--delete-config</code> option in the 
<code>kafka-topics</code> command line tool has been deprecated.
                 </li>
                 <li>
                     For implementors of RemoteLogMetadataManager (RLMM), a new 
API `nextSegmentWithTxnIndex` is
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 72a4ae58553..a2ead8effa0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -603,7 +603,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         List<String> groupIds,
         long committedOffset
     ) {
-        return groupMetadataManager.describeGroups(groupIds, committedOffset);
+        return groupMetadataManager.describeGroups(context, groupIds, 
committedOffset);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 69e7955d0bf..dda3a41d0bf 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -521,6 +521,7 @@ public class GroupMetadataManager {
                 describedGroups.add(new 
ConsumerGroupDescribeResponseData.DescribedGroup()
                     .setGroupId(groupId)
                     .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                    .setErrorMessage(exception.getMessage())
                 );
             }
         });
@@ -552,6 +553,7 @@ public class GroupMetadataManager {
                 describedGroups.add(new 
ShareGroupDescribeResponseData.DescribedGroup()
                     .setGroupId(groupId)
                     .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                    .setErrorMessage(exception.getMessage())
                 );
             }
         });
@@ -562,12 +564,14 @@ public class GroupMetadataManager {
     /**
      * Handles a DescribeGroup request.
      *
+     * @param context           The request context.
      * @param groupIds          The IDs of the groups to describe.
      * @param committedOffset   A specified committed offset corresponding to 
this shard.
      *
      * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
      */
     public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
+        RequestContext context,
         List<String> groupIds,
         long committedOffset
     ) {
@@ -603,10 +607,19 @@ public class GroupMetadataManager {
                     );
                 }
             } catch (GroupIdNotFoundException exception) {
-                describedGroups.add(new 
DescribeGroupsResponseData.DescribedGroup()
-                    .setGroupId(groupId)
-                    .setGroupState(DEAD.toString())
-                );
+                if (context.header.apiVersion() >= 6) {
+                    describedGroups.add(new 
DescribeGroupsResponseData.DescribedGroup()
+                        .setGroupId(groupId)
+                        .setGroupState(DEAD.toString())
+                        .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                        .setErrorMessage(exception.getMessage())
+                    );
+                } else {
+                    describedGroups.add(new 
DescribeGroupsResponseData.DescribedGroup()
+                        .setGroupId(groupId)
+                        .setGroupState(DEAD.toString())
+                    );
+                }
             }
         });
         return describedGroups;
@@ -647,7 +660,7 @@ public class GroupMetadataManager {
             } else if (createIfNotExists && group.type() == CLASSIC && 
validateOnlineUpgrade((ClassicGroup) group)) {
                 return convertToConsumerGroup((ClassicGroup) group, records);
             } else {
-                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group", groupId));
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.", groupId));
             }
         }
     }
@@ -670,7 +683,7 @@ public class GroupMetadataManager {
         if (group.type() == CONSUMER) {
             return (ConsumerGroup) group;
         } else {
-            throw new GroupIdNotFoundException(String.format("Group %s is not 
a consumer group", groupId));
+            throw new GroupIdNotFoundException(String.format("Group %s is not 
a consumer group.", groupId));
         }
     }
 
@@ -704,7 +717,7 @@ public class GroupMetadataManager {
         Group group = groups.get(groupId);
 
         if (group == null && !createIfNotExists) {
-            throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found", groupId));
+            throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
         }
 
         if (group == null) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f6391b5bd61..97ba6a5873a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -8699,7 +8699,8 @@ public class GroupMetadataManagerTest {
         List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(List.of(groupId));
         ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId(groupId)
-            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage("Group " + groupId + " not found.");
         List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
List.of(
             describedGroup
         );
@@ -8741,7 +8742,8 @@ public class GroupMetadataManagerTest {
         List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), 
context.lastCommittedOffset);
         ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId(consumerGroupId)
-            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage("Group " + consumerGroupId + " not found.");
         List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
List.of(
             describedGroup
         );
@@ -8873,6 +8875,13 @@ public class GroupMetadataManagerTest {
         context.verifyDescribeGroupsReturnsDeadGroup("group-id");
     }
 
+    @Test
+    public void testDescribeGroupsBeforeV6GroupIdNotFoundException() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.verifyDescribeGroupsBeforeV6ReturnsDeadGroup("group-id");
+    }
+
     @Test
     public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() 
throws Exception {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -14976,6 +14985,7 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupDescribeResponseData.DescribedGroup()
                 .setGroupId(groupId)
                 .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                .setErrorMessage("Group " + groupId + " is not a consumer 
group.")
         );
 
         List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(List.of(groupId));
@@ -15048,6 +15058,7 @@ public class GroupMetadataManagerTest {
             new ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupId(groupId)
                 .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                .setErrorMessage("Group " + groupId + " is not a share group.")
         );
 
         List<ShareGroupDescribeResponseData.DescribedGroup> actual = 
context.sendShareGroupDescribe(List.of(groupId));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 1caa54325e4..0c930f29862 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -1270,7 +1270,43 @@ public class GroupMetadataManagerTestContext {
     }
 
     public List<DescribeGroupsResponseData.DescribedGroup> 
describeGroups(List<String> groupIds) {
-        return groupMetadataManager.describeGroups(groupIds, 
lastCommittedOffset);
+        RequestContext context = new RequestContext(
+            new RequestHeader(
+                ApiKeys.DESCRIBE_GROUPS,
+                ApiKeys.DESCRIBE_GROUPS.latestVersion(),
+                DEFAULT_CLIENT_ID,
+                0
+            ),
+            "1",
+            DEFAULT_CLIENT_ADDRESS,
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false
+        );
+
+        return groupMetadataManager.describeGroups(context, groupIds, 
lastCommittedOffset);
+    }
+
+    public List<DescribeGroupsResponseData.DescribedGroup> 
describeGroups(List<String> groupIds, short apiVersion) {
+        RequestContext context = new RequestContext(
+            new RequestHeader(
+                ApiKeys.DESCRIBE_GROUPS,
+                apiVersion,
+                DEFAULT_CLIENT_ID,
+                0
+            ),
+            "1",
+            DEFAULT_CLIENT_ADDRESS,
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false
+        );
+
+        return groupMetadataManager.describeGroups(context, groupIds, 
lastCommittedOffset);
     }
 
     public List<ShareGroupDescribeResponseData.DescribedGroup> 
sendShareGroupDescribe(List<String> groupIds) {
@@ -1390,6 +1426,21 @@ public class GroupMetadataManagerTestContext {
             List.of(new DescribeGroupsResponseData.DescribedGroup()
                 .setGroupId(groupId)
                 .setGroupState(DEAD.toString())
+                .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                .setErrorMessage("Group " + groupId + " not found.")
+            ),
+            describedGroups
+        );
+    }
+
+    public void verifyDescribeGroupsBeforeV6ReturnsDeadGroup(String groupId) {
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            describeGroups(Collections.singletonList(groupId), (short) 5);
+
+        assertEquals(
+            Collections.singletonList(new 
DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId(groupId)
+                .setGroupState(DEAD.toString())
             ),
             describedGroups
         );
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 2d66e0fd86b..6fbdfeaf79b 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.Time;
@@ -1002,7 +1003,9 @@ public class IntegrationTestUtils {
                             .get(applicationId)
                             .get();
             return groupDescription.members().isEmpty();
-        } catch (final ExecutionException | InterruptedException e) {
+        } catch (final ExecutionException e) {
+            return e.getCause() instanceof GroupIdNotFoundException;
+        } catch (final InterruptedException e) {
             return false;
         }
     }
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java 
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index 526d9661928..a100110517e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
@@ -170,17 +171,24 @@ public class StreamsResetter {
         final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
             Collections.singleton(groupId),
             new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
-        final List<MemberDescription> members =
-            new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
-        if (!members.isEmpty()) {
-            if (options.hasForce()) {
-                System.out.println("Force deleting all active members in the 
group: " + groupId);
-                adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all().get();
-            } else {
-                throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active "
+        try {
+            final List<MemberDescription> members =
+                new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+            if (!members.isEmpty()) {
+                if (options.hasForce()) {
+                    System.out.println("Force deleting all active members in 
the group: " + groupId);
+                    adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all().get();
+                } else {
+                    throw new IllegalStateException("Consumer group '" + 
groupId + "' is still active "
                         + "and has following members: " + members + ". "
                         + "Make sure to stop all running application instances 
before running the reset tool."
                         + " You can use option '--force' to remove active 
members from the group.");
+                }
+            }
+        } catch (ExecutionException ee) {
+            // If the group ID is not found, this is not an error case
+            if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
+                throw ee;
             }
         }
     }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 33e483cbb94..26a93e0a58c 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.utils.Utils;
@@ -569,34 +570,54 @@ public class ConsumerGroupCommand {
                     switch (state) {
                         case "Empty":
                         case "Dead":
-                            Collection<TopicPartition> partitionsToReset = 
getPartitionsToReset(groupId);
-                            Map<TopicPartition, OffsetAndMetadata> 
preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset);
-
-                            // Dry-run is the default behavior if --execute is 
not specified
-                            boolean dryRun = opts.options.has(opts.dryRunOpt) 
|| !opts.options.has(opts.executeOpt);
-                            if (!dryRun) {
-                                adminClient.alterConsumerGroupOffsets(
-                                    groupId,
-                                    preparedOffsets,
-                                    withTimeoutMs(new 
AlterConsumerGroupOffsetsOptions())
-                                ).all().get();
-                            }
-
-                            result.put(groupId, preparedOffsets);
-
+                            result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
                             break;
                         default:
                             printError("Assignments can only be reset if the 
group '" + groupId + "' is inactive, but the current state is " + state + ".", 
Optional.empty());
                             result.put(groupId, Collections.emptyMap());
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
+                } catch (InterruptedException ie) {
+                    throw new RuntimeException(ie);
+                } catch (ExecutionException ee) {
+                    if (ee.getCause() instanceof GroupIdNotFoundException) {
+                        result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                    } else {
+                        throw new RuntimeException(ee);
+                    }
                 }
             });
 
             return result;
         }
 
+        private Map<TopicPartition, OffsetAndMetadata> 
resetOffsetsForInactiveGroup(String groupId) {
+            try {
+                Collection<TopicPartition> partitionsToReset = 
getPartitionsToReset(groupId);
+                Map<TopicPartition, OffsetAndMetadata> preparedOffsets = 
prepareOffsetsToReset(groupId, partitionsToReset);
+
+                // Dry-run is the default behavior if --execute is not 
specified
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterConsumerGroupOffsets(
+                        groupId,
+                        preparedOffsets,
+                        withTimeoutMs(new AlterConsumerGroupOffsetsOptions())
+                    ).all().get();
+                }
+
+                return preparedOffsets;
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    throw (KafkaException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
         Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String 
groupId, List<String> topics) {
             Map<TopicPartition, Throwable> partitionLevelResult = new 
HashMap<>();
             Set<String> topicWithPartitions = new HashSet<>();
@@ -702,7 +723,7 @@ public class ConsumerGroupCommand {
                     System.out.printf(format,
                         tp.topic(),
                         tp.partition() >= 0 ? tp.partition() : "Not Provided",
-                        error != null ? "Error: :" + error.getMessage() : 
"Successful"
+                        error != null ? "Error: " + error.getMessage() : 
"Successful"
                     );
                 });
             System.out.println();
@@ -1231,8 +1252,10 @@ public class ConsumerGroupCommand {
                 try {
                     f.get();
                     success.put(g, null);
-                } catch (ExecutionException | InterruptedException e) {
-                    failed.put(g, e);
+                } catch (InterruptedException ie) {
+                    failed.put(g, ie);
+                } catch (ExecutionException e) {
+                    failed.put(g, e.getCause());
                 }
             });
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 6289b3a3099..48d51622e9c 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -257,7 +257,7 @@ public class ConsumerGroupCommandOptions extends 
CommandDefaultOptions {
                 CommandLineUtils.printUsageAndExit(parser, "Option " + 
resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt);
 
             if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
-                System.err.println("WARN: No action will be performed as the 
--execute option is missing." +
+                System.err.println("WARN: No action will be performed as the 
--execute option is missing. " +
                     "In a future major release, the default behavior of this 
command will be to prompt the user before " +
                     "executing the reset rather than doing a dry run. You 
should add the --dry-run option explicitly " +
                     "if you are scripting this command and want to keep the 
current default behavior without prompting.");
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java
index e815002899b..36073f26dcc 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java
@@ -19,16 +19,20 @@ package org.apache.kafka.tools.consumer.group;
 import kafka.api.AbstractAuthorizerIntegrationTest;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
+import java.util.concurrent.ExecutionException;
 
 import scala.jdk.javaapi.CollectionConverters;
 
 import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
 import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     @ParameterizedTest
@@ -38,8 +42,12 @@ public class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest
 
         String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group()};
         ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(cgcArgs);
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new 
ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap());
-        consumerGroupService.describeGroups();
-        consumerGroupService.close();
+        try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap())) {
+            consumerGroupService.describeGroups();
+            fail("Non-existent group should throw an exception");
+        } catch (ExecutionException e) {
+            assertInstanceOf(GroupIdNotFoundException.class, e.getCause(),
+                "Non-existent group should throw GroupIdNotFoundException");
+        }
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
index 3af38c44806..e353aa0b016 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
@@ -100,7 +100,7 @@ public class DeleteConsumerGroupsTest {
             assertEquals(1, result.size());
             assertNotNull(result.get(missingGroupId));
             assertInstanceOf(GroupIdNotFoundException.class,
-                    result.get(missingGroupId).getCause(),
+                    result.get(missingGroupId),
                     "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") 
was not detected while deleting consumer group");
         }
     }
@@ -132,7 +132,7 @@ public class DeleteConsumerGroupsTest {
                 assertEquals(1, result.size());
                 assertNotNull(result.get(groupId));
                 assertInstanceOf(GroupNotEmptyException.class,
-                        result.get(groupId).getCause(),
+                        result.get(groupId),
                         "The expected error (" + Errors.NON_EMPTY_GROUP + ") 
was not detected while deleting consumer group. Result was:(" + result + ")");
             }
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index 57ec0efbcb3..dd20003eb4e 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.test.api.ClusterConfig;
@@ -68,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @ExtendWith(value = ClusterTestExtensions.class)
 public class DescribeConsumerGroupTest {
@@ -92,9 +94,13 @@ public class DescribeConsumerGroupTest {
             List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup));
             cgcArgs.addAll(describeType);
             try (ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(cgcArgs.toArray(new String[0]))) {
-                String output = 
ToolsTestUtils.grabConsoleOutput(describeGroups(service));
-                assertTrue(output.contains("Consumer group '" + missingGroup + 
"' does not exist."),
-                        "Expected error was not detected for describe option 
'" + String.join(" ", describeType) + "'");
+                service.describeGroups();
+                fail("Expected error was not detected for describe option '" + 
String.join(" ", describeType) + "'");
+            } catch (ExecutionException ee) {
+                assertInstanceOf(GroupIdNotFoundException.class, 
ee.getCause());
+                assertEquals("Group " + missingGroup + " not found.", 
ee.getCause().getMessage());
+            } catch (Exception e) {
+                fail("Expected error was not detected for describe option '" + 
String.join(" ", describeType) + "'");
             }
         }
     }
@@ -113,9 +119,11 @@ public class DescribeConsumerGroupTest {
                  // note the group to be queried is a different (non-existing) 
group
                  ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
             ) {
-                Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(missingGroup);
-                assertTrue(res.getKey().map(s -> 
s.equals(GroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
-                        "Expected the state to be 'Dead', with no members in 
the group '" + missingGroup + "'.");
+                service.collectGroupOffsets(missingGroup);
+                fail("Expected the group '" + missingGroup + "' to throw 
GroupIdNotFoundException");
+            } catch (ExecutionException ee) {
+                assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(),
+                    "Expected the group '" + missingGroup + "' to throw 
GroupIdNotFoundException");
             }
         }
     }
@@ -132,13 +140,11 @@ public class DescribeConsumerGroupTest {
                  // note the group to be queried is a different (non-existing) 
group
                  ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
             ) {
-                Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(missingGroup, false);
-                assertTrue(res.getKey().map(s -> 
s.equals(GroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
-                        "Expected the state to be 'Dead', with no members in 
the group '" + missingGroup + "'.");
-
-                Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res2 = 
service.collectGroupMembers(missingGroup, true);
-                assertTrue(res2.getKey().map(s -> 
s.equals(GroupState.DEAD)).orElse(false) && 
res2.getValue().map(Collection::isEmpty).orElse(false),
-                        "Expected the state to be 'Dead', with no members in 
the group '" + missingGroup + "' (verbose option).");
+                service.collectGroupMembers(missingGroup, false);
+                fail("Expected the group '" + missingGroup + "' to throw 
GroupIdNotFoundException");
+            } catch (ExecutionException ee) {
+                assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(),
+                    "Expected the group '" + missingGroup + "' to throw 
GroupIdNotFoundException");
             }
         }
     }
@@ -155,11 +161,11 @@ public class DescribeConsumerGroupTest {
                  // note the group to be queried is a different (non-existing) 
group
                  ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
             ) {
-                GroupInformation state = 
service.collectGroupState(missingGroup);
-                assertTrue(Objects.equals(state.groupState, GroupState.DEAD) 
&& state.numMembers == 0 &&
-                                state.coordinator != null && 
clusterInstance.brokerIds().contains(state.coordinator.id()),
-                        "Expected the state to be 'Dead', with no members in 
the group '" + missingGroup + "'."
-                );
+                service.collectGroupState(missingGroup);
+                fail("Expected the group '" + missingGroup + "' to throw 
GroupIdNotFoundException");
+            } catch (ExecutionException ee) {
+                assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(),
+                        "Expected the group '" + missingGroup + "' to throw 
GroupIdNotFoundException");
             }
         }
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 5aa73dae2de..a49597d638f 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -136,10 +136,6 @@ public class ResetConsumerGroupOffsetTest {
         String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
 
         try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
-            // Make sure we got a coordinator
-            TestUtils.waitForCondition(
-                    () -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
-                    "Can't find a coordinator");
             Map<TopicPartition, OffsetAndMetadata> resetOffsets = 
service.resetOffsets().get(group);
             assertTrue(resetOffsets.isEmpty());
             assertTrue(committedOffsets(cluster, topic, group).isEmpty());

Reply via email to