This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 729f9ccf065 KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC (#20049) 729f9ccf065 is described below commit 729f9ccf065b35ce2e4e5b5fe1e9ce129d2ace08 Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Thu Jul 3 11:00:56 2025 +0100 KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC (#20049) While testing the code in https://github.com/apache/kafka/pull/19820, it became clear that the error handling problems were due to the underlying Admin API. This PR fixes the error handling for top-level errors in the AlterShareGroupOffsets RPC. Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Lan Ding <isdin...@163.com>, TaiJuWu <tjwu1...@gmail.com> --- .../admin/AlterShareGroupOffsetsResult.java | 23 +++--- .../kafka/clients/admin/KafkaAdminClient.java | 10 ++- .../internals/AlterShareGroupOffsetsHandler.java | 93 ++++++++++++---------- .../requests/AlterShareGroupOffsetsRequest.java | 31 +++++--- .../requests/DeleteShareGroupOffsetsRequest.java | 10 ++- .../kafka/clients/admin/KafkaAdminClientTest.java | 30 ++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 31 +++----- .../kafka/api/AuthorizerIntegrationTest.scala | 14 ++++ .../scala/unit/kafka/server/KafkaApisTest.scala | 12 ++- .../coordinator/group/GroupCoordinatorService.java | 53 ++++++------ .../group/GroupCoordinatorServiceTest.java | 61 ++++++++++++-- 11 files changed, 238 insertions(+), 130 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java index 7c41852231d..293daaadbb9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java @@ -20,6 +20,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.protocol.Errors; @@ -35,9 +36,9 @@ import java.util.stream.Collectors; @InterfaceStability.Evolving public class AlterShareGroupOffsetsResult { - private final KafkaFuture<Map<TopicPartition, Errors>> future; + private final KafkaFuture<Map<TopicPartition, ApiException>> future; - AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) { + AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future) { this.future = future; } @@ -54,11 +55,11 @@ public class AlterShareGroupOffsetsResult { result.completeExceptionally(new IllegalArgumentException( "Alter offset for partition \"" + partition + "\" was not attempted")); } else { - final Errors error = topicPartitions.get(partition); - if (error == Errors.NONE) { + final ApiException exception = topicPartitions.get(partition); + if (exception == null) { result.complete(null); } else { - result.completeExceptionally(error.exception()); + result.completeExceptionally(exception); } } }); @@ -68,22 +69,22 @@ public class AlterShareGroupOffsetsResult { /** * Return a future which succeeds if all the alter offsets succeed. + * If not, the first topic error shall be returned. */ public KafkaFuture<Void> all() { return this.future.thenApply(topicPartitionErrorsMap -> { List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() .stream() - .filter(e -> e.getValue() != Errors.NONE) + .filter(e -> e.getValue() != null) .map(Map.Entry::getKey) .collect(Collectors.toList()); - for (Errors error : topicPartitionErrorsMap.values()) { - if (error != Errors.NONE) { - throw error.exception( - "Failed altering share group offsets for the following partitions: " + partitionsFailed); + for (ApiException exception : topicPartitionErrorsMap.values()) { + if (exception != null) { + throw Errors.forException(exception).exception( + "Failed altering group offsets for the following partitions: " + partitionsFailed); } } return null; }); } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index b283d65cbee..e1be4304950 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3804,8 +3804,10 @@ public class KafkaAdminClient extends AdminClient { } @Override - public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) { - SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> future = AlterShareGroupOffsetsHandler.newFuture(groupId); + public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId, + final Map<TopicPartition, Long> offsets, + final AlterShareGroupOffsetsOptions options) { + SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> future = AlterShareGroupOffsetsHandler.newFuture(groupId); AlterShareGroupOffsetsHandler handler = new AlterShareGroupOffsetsHandler(groupId, offsets, logContext); invokeDriver(handler, future, options.timeoutMs); return new AlterShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); @@ -3821,7 +3823,9 @@ public class KafkaAdminClient extends AdminClient { } @Override - public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) { + public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String groupId, + final Set<String> topics, + final DeleteShareGroupOffsetsOptions options) { SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId); DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, topics, logContext); invokeDriver(handler, future, options.timeoutMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java index f66f5972836..ef21be6b6d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java @@ -21,8 +21,8 @@ import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; -import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest; @@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -42,7 +41,7 @@ import java.util.Set; /** * This class is the handler for {@link KafkaAdminClient#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call */ -public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> { +public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> { private final CoordinatorKey groupId; @@ -52,7 +51,6 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord private final CoordinatorStrategy lookupStrategy; - public AlterShareGroupOffsetsHandler(String groupId, Map<TopicPartition, Long> offsets, LogContext logContext) { this.groupId = CoordinatorKey.byGroupId(groupId); this.offsets = offsets; @@ -60,8 +58,15 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext); } - public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture(String groupId) { - return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> newFuture(String groupId) { + return AdminApiFuture.forKeys(Set.of(CoordinatorKey.byGroupId(groupId))); + } + + private void validateKeys(Set<CoordinatorKey> groupIds) { + if (!groupIds.equals(Set.of(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Set.of(groupId) + ")"); + } } @Override @@ -87,30 +92,38 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord } @Override - public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) { + public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) { + validateKeys(keys); + AlterShareGroupOffsetsResponse response = (AlterShareGroupOffsetsResponse) abstractResponse; - final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); - - for (AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topic : response.data().responses()) { - for (AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partition : topic.partitions()) { - TopicPartition topicPartition = new TopicPartition(topic.topicName(), partition.partitionIndex()); - Errors error = Errors.forCode(partition.errorCode()); - - if (error != Errors.NONE) { - handleError( - groupId, - topicPartition, - error, - partitionResults, - groupsToUnmap, - groupsToRetry - ); - } else { - partitionResults.put(topicPartition, error); + final Map<TopicPartition, ApiException> partitionResults = new HashMap<>(); + + if (response.data().errorCode() != Errors.NONE.code()) { + final Errors topLevelError = Errors.forCode(response.data().errorCode()); + final String topLevelErrorMessage = response.data().errorMessage(); + + offsets.forEach((topicPartition, offset) -> + handleError( + groupId, + topicPartition, + topLevelError, + topLevelErrorMessage, + partitionResults, + groupsToUnmap, + groupsToRetry + )); + } else { + response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { + if (partition.errorCode() != Errors.NONE.code()) { + final Errors partitionError = Errors.forCode(partition.errorCode()); + final String partitionErrorMessage = partition.errorMessage(); + log.debug("AlterShareGroupOffsets request for group id {} and topic-partition {}-{} failed and returned error {}." + partitionErrorMessage, + groupId.idValue, topic.topicName(), partition.partitionIndex(), partitionError); } - } + partitionResults.put(new TopicPartition(topic.topicName(), partition.partitionIndex()), Errors.forCode(partition.errorCode()).exception(partition.errorMessage())); + })); } if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { @@ -121,23 +134,23 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord } private void handleError( - CoordinatorKey groupId, - TopicPartition topicPartition, - Errors error, - Map<TopicPartition, Errors> partitionResults, - Set<CoordinatorKey> groupsToUnmap, - Set<CoordinatorKey> groupsToRetry + CoordinatorKey groupId, + TopicPartition topicPartition, + Errors error, + String errorMessage, + Map<TopicPartition, ApiException> partitionResults, + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case COORDINATOR_LOAD_IN_PROGRESS: case REBALANCE_IN_PROGRESS: - log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will retry.", - groupId.idValue, error); + log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will retry." + errorMessage, groupId.idValue, error); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry.", + log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry." + errorMessage, groupId.idValue, error); groupsToUnmap.add(groupId); break; @@ -147,14 +160,12 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord case UNKNOWN_SERVER_ERROR: case KAFKA_STORAGE_ERROR: case GROUP_AUTHORIZATION_FAILED: - log.debug("AlterShareGroupOffsets request for group id {} and partition {} failed due" + - " to error {}.", groupId.idValue, topicPartition, error); - partitionResults.put(topicPartition, error); + log.debug("AlterShareGroupOffsets request for group id {} failed due to error {}." + errorMessage, groupId.idValue, error); + partitionResults.put(topicPartition, error.exception(errorMessage)); break; default: - log.error("AlterShareGroupOffsets request for group id {} and partition {} failed due" + - " to unexpected error {}.", groupId.idValue, topicPartition, error); - partitionResults.put(topicPartition, error); + log.error("AlterShareGroupOffsets request for group id {} failed due to unexpected error {}." + errorMessage, groupId.idValue, error); + partitionResults.put(topicPartition, error.exception(errorMessage)); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java index 2eb9e37bc50..be04568e1a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java @@ -53,26 +53,31 @@ public class AlterShareGroupOffsetsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Errors error = Errors.forException(e); - return new AlterShareGroupOffsetsResponse(getErrorResponse(throttleTimeMs, error)); + public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return getErrorResponse(throttleTimeMs, Errors.forException(e)); } - public static AlterShareGroupOffsetsResponseData getErrorResponse(int throttleTimeMs, Errors error) { - return new AlterShareGroupOffsetsResponseData() - .setThrottleTimeMs(throttleTimeMs) - .setErrorCode(error.code()) - .setErrorMessage(error.message()); + public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Errors error) { + return getErrorResponse(throttleTimeMs, error.code(), error.message()); + } + + public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, short errorCode, String message) { + return new AlterShareGroupOffsetsResponse( + new AlterShareGroupOffsetsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(errorCode) + .setErrorMessage(message) + ); } - public static AlterShareGroupOffsetsResponseData getErrorResponse(Errors error) { - return getErrorResponse(error.code(), error.message()); + public static AlterShareGroupOffsetsResponseData getErrorResponseData(Errors error) { + return getErrorResponseData(error, null); } - public static AlterShareGroupOffsetsResponseData getErrorResponse(short errorCode, String errorMessage) { + public static AlterShareGroupOffsetsResponseData getErrorResponseData(Errors error, String errorMessage) { return new AlterShareGroupOffsetsResponseData() - .setErrorCode(errorCode) - .setErrorMessage(errorMessage); + .setErrorCode(error.code()) + .setErrorMessage(errorMessage == null ? error.message() : errorMessage); } public static AlterShareGroupOffsetsRequest parse(Readable readable, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java index bec0077b9b3..1e28115bada 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java @@ -80,12 +80,18 @@ public class DeleteShareGroupOffsetsRequest extends AbstractRequest { } public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(Errors error) { - return getErrorDeleteResponseData(error.code(), error.message()); + return getErrorDeleteResponseData(error, null); } public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(short errorCode, String errorMessage) { return new DeleteShareGroupOffsetsResponseData() .setErrorCode(errorCode) - .setErrorMessage(errorMessage); + .setErrorMessage(errorMessage == null ? Errors.forCode(errorCode).message() : errorMessage); + } + + public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(Errors error, String errorMessage) { + return new DeleteShareGroupOffsetsResponseData() + .setErrorCode(error.code()) + .setErrorMessage(errorMessage == null ? error.message() : errorMessage); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 1d516cf6648..1098078b582 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -56,7 +56,6 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.DuplicateVoterException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.GroupSubscribedToTopicException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; @@ -11351,6 +11350,28 @@ public class KafkaAdminClientTest { } } + @Test + public void testAlterShareGroupOffsetsWithTopLevelError() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()).setErrorMessage("Group authorization failed."); + + TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0); + TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1); + TopicPartition barPartition0 = new TopicPartition("bar", 0); + TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0); + + env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data)); + final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L)); + + TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.all()); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.partitionResult(fooTopicPartition1)); + TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); + } + } + @Test public void testAlterShareGroupOffsetsWithErrorInOnePartition() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -11359,7 +11380,8 @@ public class KafkaAdminClientTest { AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses( new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of( - new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NON_EMPTY_GROUP.code()).setErrorMessage("The group is not empty"))), + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()).setErrorMessage("Topic authorization failed."))), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0))) ).iterator()) ); @@ -11371,9 +11393,9 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data)); final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L)); - TestUtils.assertFutureThrows(GroupNotEmptyException.class, result.all()); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all()); assertNull(result.partitionResult(fooTopicPartition0).get()); - TestUtils.assertFutureThrows(GroupNotEmptyException.class, result.partitionResult(fooTopicPartition1)); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.partitionResult(fooTopicPartition1)); assertNull(result.partitionResult(barPartition0).get()); } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5eb249c54d6..21edc36c13d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3756,7 +3756,7 @@ class KafkaApis(val requestChannel: RequestChannel, val groupId = alterShareGroupOffsetsRequest.data.groupId if (!isShareGroupProtocolEnabled) { - requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) return CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) @@ -3766,9 +3766,9 @@ class KafkaApis(val requestChannel: RequestChannel, alterShareGroupOffsetsRequest.data.topics.forEach(topic => { val topicError = { - if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) { + if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName)) { Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED)) - } else if (!metadataCache.contains(topic.topicName())) { + } else if (!metadataCache.contains(topic.topicName)) { Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)) } else { None @@ -3776,9 +3776,9 @@ class KafkaApis(val requestChannel: RequestChannel, } topicError match { case Some(error) => - topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), metadataCache.topicNamesToIds(), error.error)) + topic.partitions.forEach(partition => responseBuilder.addPartition(topic.topicName, partition.partitionIndex, metadataCache.topicNamesToIds, error.error)) case None => - authorizedTopicPartitions.add(topic) + authorizedTopicPartitions.add(topic.duplicate) } }) @@ -3792,8 +3792,10 @@ class KafkaApis(val requestChannel: RequestChannel, ).handle[Unit] { (response, exception) => if (exception != null) { requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception)) + } else if (response.errorCode != Errors.NONE.code) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, response.errorCode, response.errorMessage)) } else { - requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response, metadataCache.topicNamesToIds()).build()) + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response, metadataCache.topicNamesToIds).build()) } } } @@ -3824,22 +3826,13 @@ class KafkaApis(val requestChannel: RequestChannel, new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() .setTopicName(topic.topicName) .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message()) + .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message) ) } else { authorizedTopics.add(topic) } } - if (authorizedTopics.isEmpty) { - requestHelper.sendMaybeThrottle( - request, - new DeleteShareGroupOffsetsResponse( - new DeleteShareGroupOffsetsResponseData() - .setResponses(deleteShareGroupOffsetsResponseTopics))) - return CompletableFuture.completedFuture[Unit](()) - } - groupCoordinator.deleteShareGroupOffsets( request.context, new DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics) @@ -3847,12 +3840,12 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse( AbstractResponse.DEFAULT_THROTTLE_TIME, - Errors.forException(exception).code(), - exception.getMessage())) + Errors.forException(exception).code, + exception.getMessage)) } else if (responseData.errorCode() != Errors.NONE.code) { requestHelper.sendMaybeThrottle( request, - deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, responseData.errorCode(), responseData.errorMessage()) + deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, responseData.errorCode, responseData.errorMessage) ) } else { responseData.responses.forEach { topic => { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 424772275ea..f950362354c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -3254,6 +3254,18 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { removeAllClientAcls() } + private def createEmptyShareGroup(): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), shareGroupResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) + shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup) + val consumer = createShareConsumer() + consumer.subscribe(util.Set.of(topic)) + consumer.poll(Duration.ofMillis(500L)) + consumer.close() + removeAllClientAcls() + } + @Test def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(): Unit = { createShareGroupToDescribe() @@ -3614,6 +3626,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { @Test def testDeleteShareGroupOffsetsWithoutTopicReadAcl(): Unit = { + createEmptyShareGroup() addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), shareGroupResource) val request = deleteShareGroupOffsetsRequest @@ -3663,6 +3676,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { @Test def testAlterShareGroupOffsetsWithoutTopicReadAcl(): Unit = { + createEmptyShareGroup() addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) val request = alterShareGroupOffsetsRequest diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a6c26589635..e3a396ba9d5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -12863,10 +12863,18 @@ class KafkaApisTest extends Logging { def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = { metadataCache = initializeMetadataCacheWithShareGroupsEnabled() - val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData() + val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData() .setGroupId("group") - val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest).build) + val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData).build) + + val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData() + .setErrorCode(Errors.NONE.code()) + + when(groupCoordinator.deleteShareGroupOffsets( + requestChannelRequest.context, + deleteShareGroupOffsetsRequestData + )).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse)) val resultFuture = new CompletableFuture[DeleteShareGroupOffsetsResponseData] kafkaApis = createKafkaApis() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 099201ecabb..812853a263f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -698,6 +698,10 @@ public class GroupCoordinatorService implements GroupCoordinator { InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response ) { + if (request.groupTopicPartitionData().topicsData().isEmpty()) { + return CompletableFuture.completedFuture(response); + } + return persister.initializeState(request) .handle((result, exp) -> { if (exp == null) { @@ -1233,16 +1237,20 @@ public class GroupCoordinatorService implements GroupCoordinator { * See {@link GroupCoordinator#alterShareGroupOffsets(AuthorizableRequestContext, String, AlterShareGroupOffsetsRequestData)}. */ @Override - public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets(AuthorizableRequestContext context, String groupId, AlterShareGroupOffsetsRequestData request) { + public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets( + AuthorizableRequestContext context, + String groupId, + AlterShareGroupOffsetsRequestData request + ) { if (!isActive.get() || metadataImage == null) { - return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); } if (groupId == null || groupId.isEmpty()) { - return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.INVALID_GROUP_ID)); + return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.INVALID_GROUP_ID)); } - if (request.topics() == null || request.topics().isEmpty()) { + if (request.topics() == null) { return CompletableFuture.completedFuture(new AlterShareGroupOffsetsResponseData()); } @@ -1257,7 +1265,7 @@ public class GroupCoordinatorService implements GroupCoordinator { "share-group-offsets-alter", request, exception, - (error, message) -> AlterShareGroupOffsetsRequest.getErrorResponse(error), + (error, message) -> AlterShareGroupOffsetsRequest.getErrorResponseData(error, message), log )); } @@ -1822,26 +1830,18 @@ public class GroupCoordinatorService implements GroupCoordinator { AuthorizableRequestContext context, DeleteShareGroupOffsetsRequestData requestData ) { - if (!isActive.get()) { - return CompletableFuture.completedFuture( - DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); - } - - if (metadataImage == null) { - return CompletableFuture.completedFuture( - DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); + if (!isActive.get() || metadataImage == null) { + return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); } String groupId = requestData.groupId(); if (!isGroupIdNotEmpty(groupId)) { - return CompletableFuture.completedFuture( - DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID)); + return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID)); } - if (requestData.topics() == null || requestData.topics().isEmpty()) { - return CompletableFuture.completedFuture( - new DeleteShareGroupOffsetsResponseData() + if (requestData.topics() == null) { + return CompletableFuture.completedFuture(new DeleteShareGroupOffsetsResponseData() ); } @@ -1850,15 +1850,14 @@ public class GroupCoordinatorService implements GroupCoordinator { topicPartitionFor(groupId), Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.initiateDeleteShareGroupOffsets(groupId, requestData) - ) - .thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, resultHolder)) - .exceptionally(exception -> handleOperationException( - "initiate-delete-share-group-offsets", - groupId, - exception, - (error, __) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error), - log - )); + ).thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, resultHolder) + ).exceptionally(exception -> handleOperationException( + "initiate-delete-share-group-offsets", + groupId, + exception, + (error, message) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message), + log + )); } private CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsetsState( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 01c87696053..4517d0cb8f3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -4280,10 +4280,26 @@ public class GroupCoordinatorServiceTest { service.startup(() -> 1); DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() - .setGroupId("share-group-id"); + .setGroupId("share-group-id") + .setTopics(List.of()); DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData(); + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + null, + null + ); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("initiate-delete-share-group-offsets"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + CompletableFuture<DeleteShareGroupOffsetsResponseData> future = service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); @@ -4291,7 +4307,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testDeleteShareGroupOffsetsNullTopicsInRequest() throws InterruptedException, ExecutionException { + public void testDeleteShareGroupOffsetsEmptyTopicsInRequest() throws InterruptedException, ExecutionException { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() @@ -4303,10 +4319,25 @@ public class GroupCoordinatorServiceTest { DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() .setGroupId("share-group-id") - .setTopics(null); + .setTopics(List.of()); DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData(); + GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder = + new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder( + Errors.NONE.code(), + null, + null, + null + ); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("initiate-delete-share-group-offsets"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder)); + CompletableFuture<DeleteShareGroupOffsetsResponseData> future = service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData); @@ -4400,9 +4431,7 @@ public class GroupCoordinatorServiceTest { DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() .setGroupId(groupId) - .setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() - .setTopicName(TOPIC_NAME) - )); + .setTopics(List.of()); DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData() .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) @@ -5376,13 +5405,29 @@ public class GroupCoordinatorServiceTest { AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData() .setGroupId(groupId); + AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData(); + + Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> alterShareGroupOffsetsIntermediate = + Map.entry( + new AlterShareGroupOffsetsResponseData() + .setResponses(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection()), + new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData<>("share-group", List.of())) + .build()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("share-group-offsets-alter"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(alterShareGroupOffsetsIntermediate)); + CompletableFuture<AlterShareGroupOffsetsResponseData> future = service.alterShareGroupOffsets( requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS), groupId, request ); - AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData(); assertEquals(data, future.get()); } @@ -5416,7 +5461,7 @@ public class GroupCoordinatorServiceTest { AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData() .setErrorCode(Errors.NON_EMPTY_GROUP.code()) - .setErrorMessage(Errors.NON_EMPTY_GROUP.message()); + .setErrorMessage("bad stuff"); when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("share-group-offsets-alter"),