This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 729f9ccf065 KAFKA-19440: Handle top-level errors in 
AlterShareGroupOffsets RPC (#20049)
729f9ccf065 is described below

commit 729f9ccf065b35ce2e4e5b5fe1e9ce129d2ace08
Author: Andrew Schofield <aschofi...@confluent.io>
AuthorDate: Thu Jul 3 11:00:56 2025 +0100

    KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC (#20049)
    
    While testing the code in https://github.com/apache/kafka/pull/19820, it
    became clear that the error handling problems were due to the underlying
    Admin API. This PR fixes the error handling for top-level errors in the
    AlterShareGroupOffsets RPC.
    
    Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Lan Ding
     <isdin...@163.com>, TaiJuWu <tjwu1...@gmail.com>
---
 .../admin/AlterShareGroupOffsetsResult.java        | 23 +++---
 .../kafka/clients/admin/KafkaAdminClient.java      | 10 ++-
 .../internals/AlterShareGroupOffsetsHandler.java   | 93 ++++++++++++----------
 .../requests/AlterShareGroupOffsetsRequest.java    | 31 +++++---
 .../requests/DeleteShareGroupOffsetsRequest.java   | 10 ++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 30 ++++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 31 +++-----
 .../kafka/api/AuthorizerIntegrationTest.scala      | 14 ++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 12 ++-
 .../coordinator/group/GroupCoordinatorService.java | 53 ++++++------
 .../group/GroupCoordinatorServiceTest.java         | 61 ++++++++++++--
 11 files changed, 238 insertions(+), 130 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
index 7c41852231d..293daaadbb9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
@@ -20,6 +20,7 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.protocol.Errors;
 
@@ -35,9 +36,9 @@ import java.util.stream.Collectors;
 @InterfaceStability.Evolving
 public class AlterShareGroupOffsetsResult {
 
-    private final KafkaFuture<Map<TopicPartition, Errors>> future;
+    private final KafkaFuture<Map<TopicPartition, ApiException>> future;
 
-    AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> 
future) {
+    AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
ApiException>> future) {
         this.future = future;
     }
 
@@ -54,11 +55,11 @@ public class AlterShareGroupOffsetsResult {
                 result.completeExceptionally(new IllegalArgumentException(
                     "Alter offset for partition \"" + partition + "\" was not 
attempted"));
             } else {
-                final Errors error = topicPartitions.get(partition);
-                if (error == Errors.NONE) {
+                final ApiException exception = topicPartitions.get(partition);
+                if (exception == null) {
                     result.complete(null);
                 } else {
-                    result.completeExceptionally(error.exception());
+                    result.completeExceptionally(exception);
                 }
             }
         });
@@ -68,22 +69,22 @@ public class AlterShareGroupOffsetsResult {
 
     /**
      * Return a future which succeeds if all the alter offsets succeed.
+     * If not, the first topic error shall be returned.
      */
     public KafkaFuture<Void> all() {
         return this.future.thenApply(topicPartitionErrorsMap ->  {
             List<TopicPartition> partitionsFailed = 
topicPartitionErrorsMap.entrySet()
                 .stream()
-                .filter(e -> e.getValue() != Errors.NONE)
+                .filter(e -> e.getValue() != null)
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toList());
-            for (Errors error : topicPartitionErrorsMap.values()) {
-                if (error != Errors.NONE) {
-                    throw error.exception(
-                        "Failed altering share group offsets for the following 
partitions: " + partitionsFailed);
+            for (ApiException exception : topicPartitionErrorsMap.values()) {
+                if (exception != null) {
+                    throw Errors.forException(exception).exception(
+                        "Failed altering group offsets for the following 
partitions: " + partitionsFailed);
                 }
             }
             return null;
         });
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index b283d65cbee..e1be4304950 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3804,8 +3804,10 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, 
Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
-        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> 
future = AlterShareGroupOffsetsHandler.newFuture(groupId);
+    public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String 
groupId,
+                                                               final 
Map<TopicPartition, Long> offsets,
+                                                               final 
AlterShareGroupOffsetsOptions options) {
+        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
ApiException>> future = AlterShareGroupOffsetsHandler.newFuture(groupId);
         AlterShareGroupOffsetsHandler handler = new 
AlterShareGroupOffsetsHandler(groupId, offsets, logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new 
AlterShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
@@ -3821,7 +3823,9 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String 
groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
+    public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String 
groupId,
+                                                                 final 
Set<String> topics,
+                                                                 final 
DeleteShareGroupOffsetsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> future 
= DeleteShareGroupOffsetsHandler.newFuture(groupId);
         DeleteShareGroupOffsetsHandler handler = new 
DeleteShareGroupOffsetsHandler(groupId, topics, logContext);
         invokeDriver(handler, future, options.timeoutMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
index f66f5972836..ef21be6b6d2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
@@ -21,8 +21,8 @@ import 
org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
-import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -42,7 +41,7 @@ import java.util.Set;
 /**
  * This class is the handler for {@link 
KafkaAdminClient#alterShareGroupOffsets(String, Map, 
AlterShareGroupOffsetsOptions)} call
  */
-public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {
+public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
 
     private final CoordinatorKey groupId;
 
@@ -52,7 +51,6 @@ public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coord
 
     private final CoordinatorStrategy lookupStrategy;
 
-
     public AlterShareGroupOffsetsHandler(String groupId, Map<TopicPartition, 
Long> offsets, LogContext logContext) {
         this.groupId = CoordinatorKey.byGroupId(groupId);
         this.offsets = offsets;
@@ -60,8 +58,15 @@ public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coord
         this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Errors>> newFuture(String groupId) {
-        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, ApiException>> newFuture(String groupId) {
+        return 
AdminApiFuture.forKeys(Set.of(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    private void validateKeys(Set<CoordinatorKey> groupIds) {
+        if (!groupIds.equals(Set.of(groupId))) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
+                " (expected only " + Set.of(groupId) + ")");
+        }
     }
 
     @Override
@@ -87,30 +92,38 @@ public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coord
     }
 
     @Override
-    public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> 
handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse 
abstractResponse) {
+    public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> 
handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse 
abstractResponse) {
+        validateKeys(keys);
+
         AlterShareGroupOffsetsResponse response = 
(AlterShareGroupOffsetsResponse) abstractResponse;
-        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
         final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
         final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
-
-        for 
(AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topic : 
response.data().responses()) {
-            for 
(AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition 
partition : topic.partitions()) {
-                TopicPartition topicPartition = new 
TopicPartition(topic.topicName(), partition.partitionIndex());
-                Errors error = Errors.forCode(partition.errorCode());
-
-                if (error != Errors.NONE) {
-                    handleError(
-                        groupId,
-                        topicPartition,
-                        error,
-                        partitionResults,
-                        groupsToUnmap,
-                        groupsToRetry
-                    );
-                } else {
-                    partitionResults.put(topicPartition, error);
+        final Map<TopicPartition, ApiException> partitionResults = new 
HashMap<>();
+
+        if (response.data().errorCode() != Errors.NONE.code()) {
+            final Errors topLevelError = 
Errors.forCode(response.data().errorCode());
+            final String topLevelErrorMessage = response.data().errorMessage();
+
+            offsets.forEach((topicPartition, offset) ->
+                handleError(
+                    groupId,
+                    topicPartition,
+                    topLevelError,
+                    topLevelErrorMessage,
+                    partitionResults,
+                    groupsToUnmap,
+                    groupsToRetry
+                ));
+        } else {
+            response.data().responses().forEach(topic -> 
topic.partitions().forEach(partition -> {
+                if (partition.errorCode() != Errors.NONE.code()) {
+                    final Errors partitionError = 
Errors.forCode(partition.errorCode());
+                    final String partitionErrorMessage = 
partition.errorMessage();
+                    log.debug("AlterShareGroupOffsets request for group id {} 
and topic-partition {}-{} failed and returned error {}." + 
partitionErrorMessage,
+                        groupId.idValue, topic.topicName(), 
partition.partitionIndex(), partitionError);
                 }
-            }
+                partitionResults.put(new TopicPartition(topic.topicName(), 
partition.partitionIndex()), 
Errors.forCode(partition.errorCode()).exception(partition.errorMessage()));
+            }));
         }
 
         if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
@@ -121,23 +134,23 @@ public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coord
     }
 
     private void handleError(
-            CoordinatorKey groupId,
-            TopicPartition topicPartition,
-            Errors error,
-            Map<TopicPartition, Errors> partitionResults,
-            Set<CoordinatorKey> groupsToUnmap,
-            Set<CoordinatorKey> groupsToRetry
+        CoordinatorKey groupId,
+        TopicPartition topicPartition,
+        Errors error,
+        String errorMessage,
+        Map<TopicPartition, ApiException> partitionResults,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case COORDINATOR_LOAD_IN_PROGRESS:
             case REBALANCE_IN_PROGRESS:
-                log.debug("AlterShareGroupOffsets request for group id {} 
returned error {}. Will retry.",
-                        groupId.idValue, error);
+                log.debug("AlterShareGroupOffsets request for group id {} 
returned error {}. Will retry." + errorMessage, groupId.idValue, error);
                 groupsToRetry.add(groupId);
                 break;
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("AlterShareGroupOffsets request for group id {} 
returned error {}. Will rediscover the coordinator and retry.",
+                log.debug("AlterShareGroupOffsets request for group id {} 
returned error {}. Will rediscover the coordinator and retry." + errorMessage,
                         groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
@@ -147,14 +160,12 @@ public class AlterShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coord
             case UNKNOWN_SERVER_ERROR:
             case KAFKA_STORAGE_ERROR:
             case GROUP_AUTHORIZATION_FAILED:
-                log.debug("AlterShareGroupOffsets request for group id {} and 
partition {} failed due" +
-                        " to error {}.", groupId.idValue, topicPartition, 
error);
-                partitionResults.put(topicPartition, error);
+                log.debug("AlterShareGroupOffsets request for group id {} 
failed due to error {}." + errorMessage, groupId.idValue, error);
+                partitionResults.put(topicPartition, 
error.exception(errorMessage));
                 break;
             default:
-                log.error("AlterShareGroupOffsets request for group id {} and 
partition {} failed due" +
-                        " to unexpected error {}.", groupId.idValue, 
topicPartition, error);
-                partitionResults.put(topicPartition, error);
+                log.error("AlterShareGroupOffsets request for group id {} 
failed due to unexpected error {}." + errorMessage, groupId.idValue, error);
+                partitionResults.put(topicPartition, 
error.exception(errorMessage));
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
index 2eb9e37bc50..be04568e1a3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
@@ -53,26 +53,31 @@ public class AlterShareGroupOffsetsRequest extends 
AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Errors error = Errors.forException(e);
-        return new 
AlterShareGroupOffsetsResponse(getErrorResponse(throttleTimeMs, error));
+    public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        return getErrorResponse(throttleTimeMs, Errors.forException(e));
     }
 
-    public static AlterShareGroupOffsetsResponseData getErrorResponse(int 
throttleTimeMs, Errors error) {
-        return new AlterShareGroupOffsetsResponseData()
-            .setThrottleTimeMs(throttleTimeMs)
-            .setErrorCode(error.code())
-            .setErrorMessage(error.message());
+    public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, 
Errors error) {
+        return getErrorResponse(throttleTimeMs, error.code(), error.message());
+    }
+
+    public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, 
short errorCode, String message) {
+        return new AlterShareGroupOffsetsResponse(
+            new AlterShareGroupOffsetsResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(errorCode)
+                .setErrorMessage(message)
+        );
     }
 
-    public static AlterShareGroupOffsetsResponseData getErrorResponse(Errors 
error) {
-        return getErrorResponse(error.code(), error.message());
+    public static AlterShareGroupOffsetsResponseData 
getErrorResponseData(Errors error) {
+        return getErrorResponseData(error, null);
     }
 
-    public static AlterShareGroupOffsetsResponseData getErrorResponse(short 
errorCode, String errorMessage) {
+    public static AlterShareGroupOffsetsResponseData 
getErrorResponseData(Errors error, String errorMessage) {
         return new AlterShareGroupOffsetsResponseData()
-                .setErrorCode(errorCode)
-                .setErrorMessage(errorMessage);
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage == null ? error.message() : 
errorMessage);
     }
 
     public static AlterShareGroupOffsetsRequest parse(Readable readable, short 
version) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
index bec0077b9b3..1e28115bada 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
@@ -80,12 +80,18 @@ public class DeleteShareGroupOffsetsRequest extends 
AbstractRequest {
     }
 
     public static DeleteShareGroupOffsetsResponseData 
getErrorDeleteResponseData(Errors error) {
-        return getErrorDeleteResponseData(error.code(), error.message());
+        return getErrorDeleteResponseData(error, null);
     }
 
     public static DeleteShareGroupOffsetsResponseData 
getErrorDeleteResponseData(short errorCode, String errorMessage) {
         return new DeleteShareGroupOffsetsResponseData()
             .setErrorCode(errorCode)
-            .setErrorMessage(errorMessage);
+            .setErrorMessage(errorMessage == null ? 
Errors.forCode(errorCode).message() : errorMessage);
+    }
+
+    public static DeleteShareGroupOffsetsResponseData 
getErrorDeleteResponseData(Errors error, String errorMessage) {
+        return new DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage == null ? error.message() : 
errorMessage);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1d516cf6648..1098078b582 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -56,7 +56,6 @@ import 
org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.DuplicateVoterException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
@@ -11351,6 +11350,28 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testAlterShareGroupOffsetsWithTopLevelError() throws Exception 
{
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            AlterShareGroupOffsetsResponseData data = new 
AlterShareGroupOffsetsResponseData().setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()).setErrorMessage("Group
 authorization failed.");
+
+            TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+            TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+            TopicPartition barPartition0 = new TopicPartition("bar", 0);
+            TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
+
+            env.kafkaClient().prepareResponse(new 
AlterShareGroupOffsetsResponse(data));
+            final AlterShareGroupOffsetsResult result = 
env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 
1L, fooTopicPartition1, 2L, barPartition0, 1L));
+
+            TestUtils.assertFutureThrows(GroupAuthorizationException.class, 
result.all());
+            TestUtils.assertFutureThrows(GroupAuthorizationException.class, 
result.partitionResult(fooTopicPartition1));
+            TestUtils.assertFutureThrows(IllegalArgumentException.class, 
result.partitionResult(zooTopicPartition0));
+        }
+    }
+
     @Test
     public void testAlterShareGroupOffsetsWithErrorInOnePartition() throws 
Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -11359,7 +11380,8 @@ public class KafkaAdminClientTest {
 
             AlterShareGroupOffsetsResponseData data = new 
AlterShareGroupOffsetsResponseData().setResponses(
                 new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of(
-                    new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0),
 new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NON_EMPTY_GROUP.code()).setErrorMessage("The
 group is not empty"))),
+                    new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0),
+                        new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()).setErrorMessage("Topic
 authorization failed."))),
                     new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
                 ).iterator())
             );
@@ -11371,9 +11393,9 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
AlterShareGroupOffsetsResponse(data));
             final AlterShareGroupOffsetsResult result = 
env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 
1L, fooTopicPartition1, 2L, barPartition0, 1L));
 
-            TestUtils.assertFutureThrows(GroupNotEmptyException.class, 
result.all());
+            TestUtils.assertFutureThrows(TopicAuthorizationException.class, 
result.all());
             assertNull(result.partitionResult(fooTopicPartition0).get());
-            TestUtils.assertFutureThrows(GroupNotEmptyException.class, 
result.partitionResult(fooTopicPartition1));
+            TestUtils.assertFutureThrows(TopicAuthorizationException.class, 
result.partitionResult(fooTopicPartition1));
             assertNull(result.partitionResult(barPartition0).get());
         }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5eb249c54d6..21edc36c13d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3756,7 +3756,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = alterShareGroupOffsetsRequest.data.groupId
 
     if (!isShareGroupProtocolEnabled) {
-      requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
+      requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       return CompletableFuture.completedFuture[Unit](())
     } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
       requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
@@ -3766,9 +3766,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       alterShareGroupOffsetsRequest.data.topics.forEach(topic => {
         val topicError = {
-          if (!authHelper.authorize(request.context, READ, TOPIC, 
topic.topicName())) {
+          if (!authHelper.authorize(request.context, READ, TOPIC, 
topic.topicName)) {
             Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED))
-          } else if (!metadataCache.contains(topic.topicName())) {
+          } else if (!metadataCache.contains(topic.topicName)) {
             Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))
           } else {
             None
@@ -3776,9 +3776,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
         topicError match {
           case Some(error) =>
-            topic.partitions().forEach(partition => 
responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), 
metadataCache.topicNamesToIds(), error.error))
+            topic.partitions.forEach(partition => 
responseBuilder.addPartition(topic.topicName, partition.partitionIndex, 
metadataCache.topicNamesToIds, error.error))
           case None =>
-            authorizedTopicPartitions.add(topic)
+            authorizedTopicPartitions.add(topic.duplicate)
         }
       })
 
@@ -3792,8 +3792,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       ).handle[Unit] { (response, exception) =>
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(exception))
+        } else if (response.errorCode != Errors.NONE.code) {
+          requestHelper.sendMaybeThrottle(request, 
alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 response.errorCode, response.errorMessage))
         } else {
-          requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(response, metadataCache.topicNamesToIds()).build())
+          requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(response, metadataCache.topicNamesToIds).build())
         }
       }
     }
@@ -3824,22 +3826,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
             .setTopicName(topic.topicName)
             .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-            .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
+            .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
         )
       } else {
         authorizedTopics.add(topic)
       }
     }
 
-    if (authorizedTopics.isEmpty) {
-      requestHelper.sendMaybeThrottle(
-        request,
-        new DeleteShareGroupOffsetsResponse(
-          new DeleteShareGroupOffsetsResponseData()
-            .setResponses(deleteShareGroupOffsetsResponseTopics)))
-      return CompletableFuture.completedFuture[Unit](())
-    }
-
     groupCoordinator.deleteShareGroupOffsets(
       request.context,
       new 
DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
@@ -3847,12 +3840,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (exception != null) {
         requestHelper.sendMaybeThrottle(request, 
deleteShareGroupOffsetsRequest.getErrorResponse(
           AbstractResponse.DEFAULT_THROTTLE_TIME,
-          Errors.forException(exception).code(),
-          exception.getMessage()))
+          Errors.forException(exception).code,
+          exception.getMessage))
       } else if (responseData.errorCode() != Errors.NONE.code) {
         requestHelper.sendMaybeThrottle(
           request,
-          
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 responseData.errorCode(), responseData.errorMessage())
+          
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 responseData.errorCode, responseData.errorMessage)
         )
       } else {
         responseData.responses.forEach { topic => {
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 424772275ea..f950362354c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3254,6 +3254,18 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     removeAllClientAcls()
   }
 
+  private def createEmptyShareGroup(): Unit = {
+    createTopicWithBrokerPrincipal(topic)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), shareGroupResource)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), topicResource)
+    shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup)
+    val consumer = createShareConsumer()
+    consumer.subscribe(util.Set.of(topic))
+    consumer.poll(Duration.ofMillis(500L))
+    consumer.close()
+    removeAllClientAcls()
+  }
+
   @Test
   def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(): Unit = {
     createShareGroupToDescribe()
@@ -3614,6 +3626,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
 
   @Test
   def testDeleteShareGroupOffsetsWithoutTopicReadAcl(): Unit = {
+    createEmptyShareGroup()
     addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), 
shareGroupResource)
 
     val request = deleteShareGroupOffsetsRequest
@@ -3663,6 +3676,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
 
   @Test
   def testAlterShareGroupOffsetsWithoutTopicReadAcl(): Unit = {
+    createEmptyShareGroup()
     addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
 
     val request = alterShareGroupOffsetsRequest
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a6c26589635..e3a396ba9d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -12863,10 +12863,18 @@ class KafkaApisTest extends Logging {
   def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
     metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
 
-    val deleteShareGroupOffsetsRequest = new 
DeleteShareGroupOffsetsRequestData()
+    val deleteShareGroupOffsetsRequestData = new 
DeleteShareGroupOffsetsRequestData()
       .setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest).build)
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData).build)
+
+    val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorCode(Errors.NONE.code())
+
+    when(groupCoordinator.deleteShareGroupOffsets(
+      requestChannelRequest.context,
+      deleteShareGroupOffsetsRequestData
+    )).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse))
 
     val resultFuture = new 
CompletableFuture[DeleteShareGroupOffsetsResponseData]
     kafkaApis = createKafkaApis()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 099201ecabb..812853a263f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -698,6 +698,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         InitializeShareGroupStateParameters request,
         AlterShareGroupOffsetsResponseData response
     ) {
+        if (request.groupTopicPartitionData().topicsData().isEmpty()) {
+            return CompletableFuture.completedFuture(response);
+        }
+
         return persister.initializeState(request)
             .handle((result, exp) -> {
                 if (exp == null) {
@@ -1233,16 +1237,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
      * See {@link 
GroupCoordinator#alterShareGroupOffsets(AuthorizableRequestContext, String, 
AlterShareGroupOffsetsRequestData)}.
      */
     @Override
-    public CompletableFuture<AlterShareGroupOffsetsResponseData> 
alterShareGroupOffsets(AuthorizableRequestContext context, String groupId, 
AlterShareGroupOffsetsRequestData request) {
+    public CompletableFuture<AlterShareGroupOffsetsResponseData> 
alterShareGroupOffsets(
+        AuthorizableRequestContext context,
+        String groupId,
+        AlterShareGroupOffsetsRequestData request
+    ) {
         if (!isActive.get() || metadataImage == null) {
-            return 
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE));
+            return 
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
         }
         
         if (groupId == null || groupId.isEmpty()) {
-            return 
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.INVALID_GROUP_ID));
+            return 
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.INVALID_GROUP_ID));
         }
 
-        if (request.topics() == null || request.topics().isEmpty()) {
+        if (request.topics() == null) {
             return CompletableFuture.completedFuture(new 
AlterShareGroupOffsetsResponseData());
         }
 
@@ -1257,7 +1265,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             "share-group-offsets-alter",
             request,
             exception,
-            (error, message) -> 
AlterShareGroupOffsetsRequest.getErrorResponse(error),
+            (error, message) -> 
AlterShareGroupOffsetsRequest.getErrorResponseData(error, message),
             log
         ));
     }
@@ -1822,26 +1830,18 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         AuthorizableRequestContext context,
         DeleteShareGroupOffsetsRequestData requestData
     ) {
-        if (!isActive.get()) {
-            return CompletableFuture.completedFuture(
-                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
-        }
-
-        if (metadataImage == null) {
-            return CompletableFuture.completedFuture(
-                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+        if (!isActive.get() || metadataImage == null) {
+            return 
CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
         }
 
         String groupId = requestData.groupId();
 
         if (!isGroupIdNotEmpty(groupId)) {
-            return CompletableFuture.completedFuture(
-                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
+            return 
CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
         }
 
-        if (requestData.topics() == null || requestData.topics().isEmpty()) {
-            return CompletableFuture.completedFuture(
-                new DeleteShareGroupOffsetsResponseData()
+        if (requestData.topics() == null) {
+            return CompletableFuture.completedFuture(new 
DeleteShareGroupOffsetsResponseData()
             );
         }
 
@@ -1850,15 +1850,14 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             topicPartitionFor(groupId),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> 
coordinator.initiateDeleteShareGroupOffsets(groupId, requestData)
-        )
-            .thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, 
resultHolder))
-            .exceptionally(exception -> handleOperationException(
-                "initiate-delete-share-group-offsets",
-                groupId,
-                exception,
-                (error, __) -> 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error),
-                log
-            ));
+        ).thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, 
resultHolder)
+        ).exceptionally(exception -> handleOperationException(
+            "initiate-delete-share-group-offsets",
+            groupId,
+            exception,
+            (error, message) -> 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message),
+            log
+        ));
     }
 
     private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
deleteShareGroupOffsetsState(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 01c87696053..4517d0cb8f3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -4280,10 +4280,26 @@ public class GroupCoordinatorServiceTest {
         service.startup(() -> 1);
 
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id");
+            .setGroupId("share-group-id")
+            .setTopics(List.of());
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData();
 
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                null,
+                null
+            );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
 
@@ -4291,7 +4307,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsNullTopicsInRequest() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsEmptyTopicsInRequest() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -4303,10 +4319,25 @@ public class GroupCoordinatorServiceTest {
 
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
             .setGroupId("share-group-id")
-            .setTopics(null);
+            .setTopics(List.of());
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData();
 
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                null,
+                null
+            );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
 
@@ -4400,9 +4431,7 @@ public class GroupCoordinatorServiceTest {
 
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
             .setGroupId(groupId)
-            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
-                .setTopicName(TOPIC_NAME)
-            ));
+            .setTopics(List.of());
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
             .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
@@ -5376,13 +5405,29 @@ public class GroupCoordinatorServiceTest {
         AlterShareGroupOffsetsRequestData request = new 
AlterShareGroupOffsetsRequestData()
             .setGroupId(groupId);
 
+        AlterShareGroupOffsetsResponseData data = new 
AlterShareGroupOffsetsResponseData();
+
+        Map.Entry<AlterShareGroupOffsetsResponseData, 
InitializeShareGroupStateParameters> alterShareGroupOffsetsIntermediate =
+            Map.entry(
+                new AlterShareGroupOffsetsResponseData()
+                    .setResponses(new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection()),
+                new InitializeShareGroupStateParameters.Builder()
+                    .setGroupTopicPartitionData(new 
GroupTopicPartitionData<>("share-group", List.of()))
+                    .build());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("share-group-offsets-alter"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(alterShareGroupOffsetsIntermediate));
+
         CompletableFuture<AlterShareGroupOffsetsResponseData> future = 
service.alterShareGroupOffsets(
             requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS),
             groupId,
             request
         );
 
-        AlterShareGroupOffsetsResponseData data = new 
AlterShareGroupOffsetsResponseData();
         assertEquals(data, future.get());
     }
 
@@ -5416,7 +5461,7 @@ public class GroupCoordinatorServiceTest {
 
         AlterShareGroupOffsetsResponseData response = new 
AlterShareGroupOffsetsResponseData()
             .setErrorCode(Errors.NON_EMPTY_GROUP.code())
-            .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+            .setErrorMessage("bad stuff");
 
         when(runtime.scheduleWriteOperation(
             ArgumentMatchers.eq("share-group-offsets-alter"),


Reply via email to