This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new 7bdeb36a52a KAFKA-19246; OffsetFetch API does not return group level errors correctly with version 1 (#19704) 7bdeb36a52a is described below commit 7bdeb36a52a7a391ed317b3f66175b2b65e1617a Author: David Jacot <david.ja...@gmail.com> AuthorDate: Thu Jun 26 15:29:43 2025 +0200 KAFKA-19246; OffsetFetch API does not return group level errors correctly with version 1 (#19704) The OffsetFetch API does not support top level errors in version 1. Hence, the top level error must be returned at the partition level. Side note: It is a tad annoying that we create error response in multiple places (e.g. KafkaApis, Group CoordinatorService). There were a reason for this but I cannot remember. Reviewers: Dongnuo Lyu <d...@confluent.io>, Sean Quah <sq...@confluent.io>, Ken Huang <s7133...@gmail.com>, TengYao Chi <frankvi...@apache.org> --- .../kafka/common/requests/OffsetFetchResponse.java | 55 +++++++++++------- .../common/requests/OffsetFetchResponseTest.java | 43 ++++++++++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 66 ++++++++++++---------- .../unit/kafka/server/OffsetFetchRequestTest.scala | 49 ++++++++++++++++ .../coordinator/group/GroupCoordinatorService.java | 57 +++++++++++-------- 5 files changed, 197 insertions(+), 73 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index e76ea8f7f3b..77297e96e6e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup; import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition; @@ -99,27 +100,12 @@ public class OffsetFetchResponse extends AbstractResponse { data.topics().add(newTopic); topic.partitions().forEach(partition -> { - OffsetFetchResponsePartition newPartition; - - if (version < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION && group.errorCode() != Errors.NONE.code()) { - // Versions prior to version 2 do not support a top level error. Therefore, - // we put it at the partition level. - newPartition = new OffsetFetchResponsePartition() - .setPartitionIndex(partition.partitionIndex()) - .setErrorCode(group.errorCode()) - .setCommittedOffset(INVALID_OFFSET) - .setMetadata(NO_METADATA) - .setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH); - } else { - newPartition = new OffsetFetchResponsePartition() - .setPartitionIndex(partition.partitionIndex()) - .setErrorCode(partition.errorCode()) - .setCommittedOffset(partition.committedOffset()) - .setMetadata(partition.metadata()) - .setCommittedLeaderEpoch(partition.committedLeaderEpoch()); - } - - newTopic.partitions().add(newPartition); + newTopic.partitions().add(new OffsetFetchResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(partition.errorCode()) + .setCommittedOffset(partition.committedOffset()) + .setMetadata(partition.metadata()) + .setCommittedLeaderEpoch(partition.committedLeaderEpoch())); }); }); } @@ -239,4 +225,31 @@ public class OffsetFetchResponse extends AbstractResponse { public boolean shouldClientThrottle(short version) { return version >= 4; } + + public static OffsetFetchResponseData.OffsetFetchResponseGroup groupError( + OffsetFetchRequestData.OffsetFetchRequestGroup group, + Errors error, + int version + ) { + if (version >= TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) { + return new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(group.groupId()) + .setErrorCode(error.code()); + } else { + return new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(group.groupId()) + .setTopics(group.topics().stream().map(topic -> + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(topic.name()) + .setPartitions(topic.partitionIndexes().stream().map(partition -> + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(partition) + .setErrorCode(error.code()) + .setCommittedOffset(INVALID_OFFSET) + .setMetadata(NO_METADATA) + .setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH) + ).collect(Collectors.toList())) + ).collect(Collectors.toList())); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java index 23b5258a235..302e6309501 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -26,6 +27,9 @@ import org.junit.jupiter.params.ParameterizedTest; import java.util.List; +import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH; +import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; +import static org.apache.kafka.common.requests.OffsetFetchResponse.NO_METADATA; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -243,4 +247,43 @@ public class OffsetFetchResponseTest { new OffsetFetchResponse(data, version).group("foo") ); } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testSingleGroupWithError(short version) { + var group = new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0)) + )); + + if (version < 2) { + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group1") + .setTopics(List.of( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(List.of( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setCommittedOffset(INVALID_OFFSET) + .setMetadata(NO_METADATA) + .setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH) + )) + )), + OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID, version) + ); + } else { + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group1") + .setErrorCode(Errors.INVALID_GROUP_ID.code()), + OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID, version) + ); + } + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f69cbb0eb66..5eb249c54d6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1016,9 +1016,11 @@ class KafkaApis(val requestChannel: RequestChannel, groups.forEach { groupOffsetFetch => val isAllPartitions = groupOffsetFetch.topics == null if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupOffsetFetch.groupId)) { - futures += CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(groupOffsetFetch.groupId) - .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)) + futures += CompletableFuture.completedFuture(OffsetFetchResponse.groupError( + groupOffsetFetch, + Errors.GROUP_AUTHORIZATION_FAILED, + request.header.apiVersion() + )) } else if (isAllPartitions) { futures += fetchAllOffsetsForGroup( request.context, @@ -1043,33 +1045,35 @@ class KafkaApis(val requestChannel: RequestChannel, private def fetchAllOffsetsForGroup( requestContext: RequestContext, - offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, + groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) groupCoordinator.fetchAllOffsets( requestContext, - offsetFetchRequest, + groupFetchRequest, requireStable - ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsetFetchResponse, exception) => + ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (groupFetchResponse, exception) => if (exception != null) { - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(offsetFetchRequest.groupId) - .setErrorCode(Errors.forException(exception).code) - } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) { - offsetFetchResponse + OffsetFetchResponse.groupError( + groupFetchRequest, + Errors.forException(exception), + requestContext.apiVersion() + ) + } else if (groupFetchResponse.errorCode() != Errors.NONE.code) { + groupFetchResponse } else { // Clients are not allowed to see offsets for topics that are not authorized for Describe. val authorizedNames = authHelper.filterByAuthorized( requestContext, DESCRIBE, TOPIC, - offsetFetchResponse.topics.asScala + groupFetchResponse.topics.asScala )(_.name) val topics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] - offsetFetchResponse.topics.forEach { topic => + groupFetchResponse.topics.forEach { topic => if (authorizedNames.contains(topic.name)) { if (useTopicIds) { // If the topic is not provided by the group coordinator, we set it @@ -1087,20 +1091,20 @@ class KafkaApis(val requestChannel: RequestChannel, } } } - offsetFetchResponse.setTopics(topics.asJava) + groupFetchResponse.setTopics(topics.asJava) } } } private def fetchOffsetsForGroup( requestContext: RequestContext, - offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, + groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) if (useTopicIds) { - offsetFetchRequest.topics.forEach { topic => + groupFetchRequest.topics.forEach { topic => if (topic.topicId != Uuid.ZERO_UUID) { metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) } @@ -1112,7 +1116,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestContext, DESCRIBE, TOPIC, - offsetFetchRequest.topics.asScala + groupFetchRequest.topics.asScala )(_.name) val authorizedTopics = new mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics] @@ -1134,7 +1138,7 @@ class KafkaApis(val requestChannel: RequestChannel, topicResponse } - offsetFetchRequest.topics.forEach { topic => + groupFetchRequest.topics.forEach { topic => if (useTopicIds && topic.name.isEmpty) { errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID) } else if (!authorizedTopicNames.contains(topic.name)) { @@ -1147,25 +1151,27 @@ class KafkaApis(val requestChannel: RequestChannel, groupCoordinator.fetchOffsets( requestContext, new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId(offsetFetchRequest.groupId) - .setMemberId(offsetFetchRequest.memberId) - .setMemberEpoch(offsetFetchRequest.memberEpoch) + .setGroupId(groupFetchRequest.groupId) + .setMemberId(groupFetchRequest.memberId) + .setMemberEpoch(groupFetchRequest.memberEpoch) .setTopics(authorizedTopics.asJava), requireStable - ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsetFetchResponse, exception) => + ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (groupFetchResponse, exception) => if (exception != null) { - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(offsetFetchRequest.groupId) - .setErrorCode(Errors.forException(exception).code) - } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) { - offsetFetchResponse + OffsetFetchResponse.groupError( + groupFetchRequest, + Errors.forException(exception), + requestContext.apiVersion() + ) + } else if (groupFetchResponse.errorCode() != Errors.NONE.code) { + groupFetchResponse } else { val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]( - offsetFetchResponse.topics.size + errorTopics.size + groupFetchRequest.topics.size + errorTopics.size ) - topics.addAll(offsetFetchResponse.topics) + topics.addAll(groupFetchResponse.topics) topics.addAll(errorTopics.asJava) - offsetFetchResponse.setTopics(topics) + groupFetchResponse.setTopics(topics) } } } diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index 18f254ae40d..75bf82ef155 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -647,4 +647,53 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB ) } } + + @ClusterTest + def testGroupErrors(): Unit = { + val topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.OFFSET_FETCH.oldestVersion() to ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) { + assertEquals( + if (version >= 2) { + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("unknown") + .setErrorCode(Errors.NOT_COORDINATOR.code) + } else { + // Version 1 does not support group level errors. Hence, the error is + // returned at the partition level. + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("unknown") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setErrorCode(Errors.NOT_COORDINATOR.code) + .setCommittedOffset(-1) + .setCommittedLeaderEpoch(-1) + .setMetadata("") + ).asJava) + ).asJava) + }, + fetchOffsets( + group = new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("unknown") + .setMemberId("") + .setMemberEpoch(0) + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setTopicId(topicId) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + requireStable = false, + version = version.toShort + ) + ) + } + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index ab7ede49cfe..099201ecabb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -72,6 +72,7 @@ import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest; import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest; import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.ShareGroupDescribeRequest; import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupDescribeRequest; @@ -1551,18 +1552,20 @@ public class GroupCoordinatorService implements GroupCoordinator { boolean requireStable ) { if (!isActive.get()) { - return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(request.groupId()) - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - ); + return CompletableFuture.completedFuture(OffsetFetchResponse.groupError( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + context.requestVersion() + )); } // For backwards compatibility, we support fetch commits for the empty group id. if (request.groupId() == null) { - return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(request.groupId()) - .setErrorCode(Errors.INVALID_GROUP_ID.code()) - ); + return CompletableFuture.completedFuture(OffsetFetchResponse.groupError( + request, + Errors.INVALID_GROUP_ID, + context.requestVersion() + )); } // The require stable flag when set tells the broker to hold on returning unstable @@ -1584,6 +1587,7 @@ public class GroupCoordinatorService implements GroupCoordinator { ) ).exceptionally(exception -> handleOffsetFetchException( "fetch-offsets", + context, request, exception )); @@ -1606,18 +1610,20 @@ public class GroupCoordinatorService implements GroupCoordinator { boolean requireStable ) { if (!isActive.get()) { - return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(request.groupId()) - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - ); + return CompletableFuture.completedFuture(OffsetFetchResponse.groupError( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + context.requestVersion() + )); } // For backwards compatibility, we support fetch commits for the empty group id. if (request.groupId() == null) { - return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(request.groupId()) - .setErrorCode(Errors.INVALID_GROUP_ID.code()) - ); + return CompletableFuture.completedFuture(OffsetFetchResponse.groupError( + request, + Errors.INVALID_GROUP_ID, + context.requestVersion() + )); } // The require stable flag when set tells the broker to hold on returning unstable @@ -1639,6 +1645,7 @@ public class GroupCoordinatorService implements GroupCoordinator { ) ).exceptionally(exception -> handleOffsetFetchException( "fetch-all-offsets", + context, request, exception )); @@ -2266,12 +2273,14 @@ public class GroupCoordinatorService implements GroupCoordinator { * The handler also handles and logs unexpected errors. * * @param operationName The name of the operation. + * @param context The request context. * @param request The OffsetFetchRequestGroup request. * @param exception The exception to handle. * @return The OffsetFetchRequestGroup response. */ private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchException( String operationName, + AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, Throwable exception ) { @@ -2290,18 +2299,22 @@ public class GroupCoordinatorService implements GroupCoordinator { // NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE, // COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to // 3.9. - return new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(request.groupId()) - .setErrorCode(Errors.NOT_COORDINATOR.code()); + return OffsetFetchResponse.groupError( + request, + Errors.NOT_COORDINATOR, + context.requestVersion() + ); default: return handleOperationException( operationName, request, exception, - (error, __) -> new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(request.groupId()) - .setErrorCode(error.code()), + (error, __) -> OffsetFetchResponse.groupError( + request, + error, + context.requestVersion() + ), log ); }