This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 7bdeb36a52a KAFKA-19246; OffsetFetch API does not return group level
errors correctly with version 1 (#19704)
7bdeb36a52a is described below
commit 7bdeb36a52a7a391ed317b3f66175b2b65e1617a
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 26 15:29:43 2025 +0200
KAFKA-19246; OffsetFetch API does not return group level errors correctly
with version 1 (#19704)
The OffsetFetch API does not support top level errors in version 1.
Hence, the top level error must be returned at the partition level.
Side note: It is a tad annoying that we create error response in
multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
reason for this but I cannot remember.
Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah <[email protected]>,
Ken Huang <[email protected]>, TengYao Chi <[email protected]>
---
.../kafka/common/requests/OffsetFetchResponse.java | 55 +++++++++++-------
.../common/requests/OffsetFetchResponseTest.java | 43 ++++++++++++++
core/src/main/scala/kafka/server/KafkaApis.scala | 66 ++++++++++++----------
.../unit/kafka/server/OffsetFetchRequestTest.scala | 49 ++++++++++++++++
.../coordinator/group/GroupCoordinatorService.java | 57 +++++++++++--------
5 files changed, 197 insertions(+), 73 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index e76ea8f7f3b..77297e96e6e 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -99,27 +100,12 @@ public class OffsetFetchResponse extends AbstractResponse {
data.topics().add(newTopic);
topic.partitions().forEach(partition -> {
- OffsetFetchResponsePartition newPartition;
-
- if (version <
TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION && group.errorCode() !=
Errors.NONE.code()) {
- // Versions prior to version 2 do not support a
top level error. Therefore,
- // we put it at the partition level.
- newPartition = new OffsetFetchResponsePartition()
- .setPartitionIndex(partition.partitionIndex())
- .setErrorCode(group.errorCode())
- .setCommittedOffset(INVALID_OFFSET)
- .setMetadata(NO_METADATA)
-
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
- } else {
- newPartition = new OffsetFetchResponsePartition()
- .setPartitionIndex(partition.partitionIndex())
- .setErrorCode(partition.errorCode())
-
.setCommittedOffset(partition.committedOffset())
- .setMetadata(partition.metadata())
-
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
- }
-
- newTopic.partitions().add(newPartition);
+ newTopic.partitions().add(new
OffsetFetchResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(partition.errorCode())
+ .setCommittedOffset(partition.committedOffset())
+ .setMetadata(partition.metadata())
+
.setCommittedLeaderEpoch(partition.committedLeaderEpoch()));
});
});
}
@@ -239,4 +225,31 @@ public class OffsetFetchResponse extends AbstractResponse {
public boolean shouldClientThrottle(short version) {
return version >= 4;
}
+
+ public static OffsetFetchResponseData.OffsetFetchResponseGroup groupError(
+ OffsetFetchRequestData.OffsetFetchRequestGroup group,
+ Errors error,
+ int version
+ ) {
+ if (version >= TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
+ return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(group.groupId())
+ .setErrorCode(error.code());
+ } else {
+ return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(group.groupId())
+ .setTopics(group.topics().stream().map(topic ->
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(topic.name())
+
.setPartitions(topic.partitionIndexes().stream().map(partition ->
+ new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ .setCommittedOffset(INVALID_OFFSET)
+ .setMetadata(NO_METADATA)
+
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+ ).collect(Collectors.toList()))
+ ).collect(Collectors.toList()));
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index 23b5258a235..302e6309501 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -26,6 +27,9 @@ import org.junit.jupiter.params.ParameterizedTest;
import java.util.List;
+import static
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static org.apache.kafka.common.requests.OffsetFetchResponse.NO_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -243,4 +247,43 @@ public class OffsetFetchResponseTest {
new OffsetFetchResponse(data, version).group("foo")
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testSingleGroupWithError(short version) {
+ var group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group1")
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0))
+ ));
+
+ if (version < 2) {
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group1")
+ .setTopics(List.of(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List.of(
+ new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+
.setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setCommittedOffset(INVALID_OFFSET)
+ .setMetadata(NO_METADATA)
+
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+ ))
+ )),
+ OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID,
version)
+ );
+ } else {
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group1")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID,
version)
+ );
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f69cbb0eb66..5eb249c54d6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1016,9 +1016,11 @@ class KafkaApis(val requestChannel: RequestChannel,
groups.forEach { groupOffsetFetch =>
val isAllPartitions = groupOffsetFetch.topics == null
if (!authHelper.authorize(request.context, DESCRIBE, GROUP,
groupOffsetFetch.groupId)) {
- futures += CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(groupOffsetFetch.groupId)
- .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+ futures +=
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ groupOffsetFetch,
+ Errors.GROUP_AUTHORIZATION_FAILED,
+ request.header.apiVersion()
+ ))
} else if (isAllPartitions) {
futures += fetchAllOffsetsForGroup(
request.context,
@@ -1043,33 +1045,35 @@ class KafkaApis(val requestChannel: RequestChannel,
private def fetchAllOffsetsForGroup(
requestContext: RequestContext,
- offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+ groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
groupCoordinator.fetchAllOffsets(
requestContext,
- offsetFetchRequest,
+ groupFetchRequest,
requireStable
- ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(offsetFetchResponse, exception) =>
+ ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(groupFetchResponse, exception) =>
if (exception != null) {
- new OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(offsetFetchRequest.groupId)
- .setErrorCode(Errors.forException(exception).code)
- } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
- offsetFetchResponse
+ OffsetFetchResponse.groupError(
+ groupFetchRequest,
+ Errors.forException(exception),
+ requestContext.apiVersion()
+ )
+ } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+ groupFetchResponse
} else {
// Clients are not allowed to see offsets for topics that are not
authorized for Describe.
val authorizedNames = authHelper.filterByAuthorized(
requestContext,
DESCRIBE,
TOPIC,
- offsetFetchResponse.topics.asScala
+ groupFetchResponse.topics.asScala
)(_.name)
val topics = new
mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics]
- offsetFetchResponse.topics.forEach { topic =>
+ groupFetchResponse.topics.forEach { topic =>
if (authorizedNames.contains(topic.name)) {
if (useTopicIds) {
// If the topic is not provided by the group coordinator, we set
it
@@ -1087,20 +1091,20 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
}
- offsetFetchResponse.setTopics(topics.asJava)
+ groupFetchResponse.setTopics(topics.asJava)
}
}
}
private def fetchOffsetsForGroup(
requestContext: RequestContext,
- offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+ groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
if (useTopicIds) {
- offsetFetchRequest.topics.forEach { topic =>
+ groupFetchRequest.topics.forEach { topic =>
if (topic.topicId != Uuid.ZERO_UUID) {
metadataCache.getTopicName(topic.topicId).ifPresent(name =>
topic.setName(name))
}
@@ -1112,7 +1116,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestContext,
DESCRIBE,
TOPIC,
- offsetFetchRequest.topics.asScala
+ groupFetchRequest.topics.asScala
)(_.name)
val authorizedTopics = new
mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics]
@@ -1134,7 +1138,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicResponse
}
- offsetFetchRequest.topics.forEach { topic =>
+ groupFetchRequest.topics.forEach { topic =>
if (useTopicIds && topic.name.isEmpty) {
errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID)
} else if (!authorizedTopicNames.contains(topic.name)) {
@@ -1147,25 +1151,27 @@ class KafkaApis(val requestChannel: RequestChannel,
groupCoordinator.fetchOffsets(
requestContext,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
- .setGroupId(offsetFetchRequest.groupId)
- .setMemberId(offsetFetchRequest.memberId)
- .setMemberEpoch(offsetFetchRequest.memberEpoch)
+ .setGroupId(groupFetchRequest.groupId)
+ .setMemberId(groupFetchRequest.memberId)
+ .setMemberEpoch(groupFetchRequest.memberEpoch)
.setTopics(authorizedTopics.asJava),
requireStable
- ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(offsetFetchResponse, exception) =>
+ ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(groupFetchResponse, exception) =>
if (exception != null) {
- new OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(offsetFetchRequest.groupId)
- .setErrorCode(Errors.forException(exception).code)
- } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
- offsetFetchResponse
+ OffsetFetchResponse.groupError(
+ groupFetchRequest,
+ Errors.forException(exception),
+ requestContext.apiVersion()
+ )
+ } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+ groupFetchResponse
} else {
val topics = new
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
- offsetFetchResponse.topics.size + errorTopics.size
+ groupFetchRequest.topics.size + errorTopics.size
)
- topics.addAll(offsetFetchResponse.topics)
+ topics.addAll(groupFetchResponse.topics)
topics.addAll(errorTopics.asJava)
- offsetFetchResponse.setTopics(topics)
+ groupFetchResponse.setTopics(topics)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 18f254ae40d..75bf82ef155 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -647,4 +647,53 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
)
}
}
+
+ @ClusterTest
+ def testGroupErrors(): Unit = {
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ for (version <- ApiKeys.OFFSET_FETCH.oldestVersion() to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ assertEquals(
+ if (version >= 2) {
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("unknown")
+ .setErrorCode(Errors.NOT_COORDINATOR.code)
+ } else {
+ // Version 1 does not support group level errors. Hence, the error is
+ // returned at the partition level.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("unknown")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NOT_COORDINATOR.code)
+ .setCommittedOffset(-1)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("")
+ ).asJava)
+ ).asJava)
+ },
+ fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("unknown")
+ .setMemberId("")
+ .setMemberEpoch(0)
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setTopicId(topicId)
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava),
+ requireStable = false,
+ version = version.toShort
+ )
+ )
+ }
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index ab7ede49cfe..099201ecabb 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -72,6 +72,7 @@ import
org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
@@ -1551,18 +1552,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
boolean requireStable
) {
if (!isActive.get()) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ context.requestVersion()
+ ));
}
// For backwards compatibility, we support fetch commits for the empty
group id.
if (request.groupId() == null) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.INVALID_GROUP_ID.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.INVALID_GROUP_ID,
+ context.requestVersion()
+ ));
}
// The require stable flag when set tells the broker to hold on
returning unstable
@@ -1584,6 +1587,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
)
).exceptionally(exception -> handleOffsetFetchException(
"fetch-offsets",
+ context,
request,
exception
));
@@ -1606,18 +1610,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
boolean requireStable
) {
if (!isActive.get()) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ context.requestVersion()
+ ));
}
// For backwards compatibility, we support fetch commits for the empty
group id.
if (request.groupId() == null) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.INVALID_GROUP_ID.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.INVALID_GROUP_ID,
+ context.requestVersion()
+ ));
}
// The require stable flag when set tells the broker to hold on
returning unstable
@@ -1639,6 +1645,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
)
).exceptionally(exception -> handleOffsetFetchException(
"fetch-all-offsets",
+ context,
request,
exception
));
@@ -2266,12 +2273,14 @@ public class GroupCoordinatorService implements
GroupCoordinator {
* The handler also handles and logs unexpected errors.
*
* @param operationName The name of the operation.
+ * @param context The request context.
* @param request The OffsetFetchRequestGroup request.
* @param exception The exception to handle.
* @return The OffsetFetchRequestGroup response.
*/
private OffsetFetchResponseData.OffsetFetchResponseGroup
handleOffsetFetchException(
String operationName,
+ AuthorizableRequestContext context,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
Throwable exception
) {
@@ -2290,18 +2299,22 @@ public class GroupCoordinatorService implements
GroupCoordinator {
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to
COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers
on versions prior to
// 3.9.
- return new OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.NOT_COORDINATOR.code());
+ return OffsetFetchResponse.groupError(
+ request,
+ Errors.NOT_COORDINATOR,
+ context.requestVersion()
+ );
default:
return handleOperationException(
operationName,
request,
exception,
- (error, __) -> new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(error.code()),
+ (error, __) -> OffsetFetchResponse.groupError(
+ request,
+ error,
+ context.requestVersion()
+ ),
log
);
}