This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 7bdeb36a52a KAFKA-19246; OffsetFetch API does not return group level 
errors correctly with version 1 (#19704)
7bdeb36a52a is described below

commit 7bdeb36a52a7a391ed317b3f66175b2b65e1617a
Author: David Jacot <david.ja...@gmail.com>
AuthorDate: Thu Jun 26 15:29:43 2025 +0200

    KAFKA-19246; OffsetFetch API does not return group level errors correctly 
with version 1 (#19704)
    
    The OffsetFetch API does not support top level errors in version 1.
    Hence, the top level error must be returned at the partition level.
    
    Side note: It is a tad annoying that we create error response in
    multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
    reason for this but I cannot remember.
    
    Reviewers: Dongnuo Lyu <d...@confluent.io>, Sean Quah <sq...@confluent.io>, 
Ken Huang <s7133...@gmail.com>, TengYao Chi <frankvi...@apache.org>
---
 .../kafka/common/requests/OffsetFetchResponse.java | 55 +++++++++++-------
 .../common/requests/OffsetFetchResponseTest.java   | 43 ++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 66 ++++++++++++----------
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 49 ++++++++++++++++
 .../coordinator/group/GroupCoordinatorService.java | 57 +++++++++++--------
 5 files changed, 197 insertions(+), 73 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index e76ea8f7f3b..77297e96e6e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -99,27 +100,12 @@ public class OffsetFetchResponse extends AbstractResponse {
                     data.topics().add(newTopic);
 
                     topic.partitions().forEach(partition -> {
-                        OffsetFetchResponsePartition newPartition;
-
-                        if (version < 
TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION && group.errorCode() != 
Errors.NONE.code()) {
-                            // Versions prior to version 2 do not support a 
top level error. Therefore,
-                            // we put it at the partition level.
-                            newPartition = new OffsetFetchResponsePartition()
-                                .setPartitionIndex(partition.partitionIndex())
-                                .setErrorCode(group.errorCode())
-                                .setCommittedOffset(INVALID_OFFSET)
-                                .setMetadata(NO_METADATA)
-                                
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
-                        } else {
-                            newPartition = new OffsetFetchResponsePartition()
-                                .setPartitionIndex(partition.partitionIndex())
-                                .setErrorCode(partition.errorCode())
-                                
.setCommittedOffset(partition.committedOffset())
-                                .setMetadata(partition.metadata())
-                                
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
-                        }
-
-                        newTopic.partitions().add(newPartition);
+                        newTopic.partitions().add(new 
OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(partition.errorCode())
+                            .setCommittedOffset(partition.committedOffset())
+                            .setMetadata(partition.metadata())
+                            
.setCommittedLeaderEpoch(partition.committedLeaderEpoch()));
                     });
                 });
             }
@@ -239,4 +225,31 @@ public class OffsetFetchResponse extends AbstractResponse {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static OffsetFetchResponseData.OffsetFetchResponseGroup groupError(
+        OffsetFetchRequestData.OffsetFetchRequestGroup group,
+        Errors error,
+        int version
+    ) {
+        if (version >= TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
+            return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                .setGroupId(group.groupId())
+                .setErrorCode(error.code());
+        } else {
+            return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                .setGroupId(group.groupId())
+                .setTopics(group.topics().stream().map(topic ->
+                    new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                        .setName(topic.name())
+                        
.setPartitions(topic.partitionIndexes().stream().map(partition ->
+                            new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                                .setPartitionIndex(partition)
+                                .setErrorCode(error.code())
+                                .setCommittedOffset(INVALID_OFFSET)
+                                .setMetadata(NO_METADATA)
+                                
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+                        ).collect(Collectors.toList()))
+                ).collect(Collectors.toList()));
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index 23b5258a235..302e6309501 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -26,6 +27,9 @@ import org.junit.jupiter.params.ParameterizedTest;
 
 import java.util.List;
 
+import static 
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static org.apache.kafka.common.requests.OffsetFetchResponse.NO_METADATA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -243,4 +247,43 @@ public class OffsetFetchResponseTest {
             new OffsetFetchResponse(data, version).group("foo")
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testSingleGroupWithError(short version) {
+        var group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId("group1")
+            .setTopics(List.of(
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(List.of(0))
+            ));
+
+        if (version < 2) {
+            assertEquals(
+                new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                    .setGroupId("group1")
+                    .setTopics(List.of(
+                        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                            .setName("foo")
+                            .setPartitions(List.of(
+                                new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                                    .setPartitionIndex(0)
+                                    
.setErrorCode(Errors.INVALID_GROUP_ID.code())
+                                    .setCommittedOffset(INVALID_OFFSET)
+                                    .setMetadata(NO_METADATA)
+                                    
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+                            ))
+                    )),
+                OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID, 
version)
+            );
+        } else {
+            assertEquals(
+                new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                    .setGroupId("group1")
+                    .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+                OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID, 
version)
+            );
+        }
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f69cbb0eb66..5eb249c54d6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1016,9 +1016,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     groups.forEach { groupOffsetFetch =>
       val isAllPartitions = groupOffsetFetch.topics == null
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
groupOffsetFetch.groupId)) {
-        futures += CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-          .setGroupId(groupOffsetFetch.groupId)
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+        futures += 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+          groupOffsetFetch,
+          Errors.GROUP_AUTHORIZATION_FAILED,
+          request.header.apiVersion()
+        ))
       } else if (isAllPartitions) {
         futures += fetchAllOffsetsForGroup(
           request.context,
@@ -1043,33 +1045,35 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def fetchAllOffsetsForGroup(
     requestContext: RequestContext,
-    offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
     val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
 
     groupCoordinator.fetchAllOffsets(
       requestContext,
-      offsetFetchRequest,
+      groupFetchRequest,
       requireStable
-    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(offsetFetchResponse, exception) =>
+    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(groupFetchResponse, exception) =>
       if (exception != null) {
-        new OffsetFetchResponseData.OffsetFetchResponseGroup()
-          .setGroupId(offsetFetchRequest.groupId)
-          .setErrorCode(Errors.forException(exception).code)
-      } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
-        offsetFetchResponse
+        OffsetFetchResponse.groupError(
+          groupFetchRequest,
+          Errors.forException(exception),
+          requestContext.apiVersion()
+        )
+      } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+        groupFetchResponse
       } else {
         // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
         val authorizedNames = authHelper.filterByAuthorized(
           requestContext,
           DESCRIBE,
           TOPIC,
-          offsetFetchResponse.topics.asScala
+          groupFetchResponse.topics.asScala
         )(_.name)
 
         val topics = new 
mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics]
-        offsetFetchResponse.topics.forEach { topic =>
+        groupFetchResponse.topics.forEach { topic =>
           if (authorizedNames.contains(topic.name)) {
             if (useTopicIds) {
               // If the topic is not provided by the group coordinator, we set 
it
@@ -1087,20 +1091,20 @@ class KafkaApis(val requestChannel: RequestChannel,
             }
           }
         }
-        offsetFetchResponse.setTopics(topics.asJava)
+        groupFetchResponse.setTopics(topics.asJava)
       }
     }
   }
 
   private def fetchOffsetsForGroup(
     requestContext: RequestContext,
-    offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
     val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
 
     if (useTopicIds) {
-      offsetFetchRequest.topics.forEach { topic =>
+      groupFetchRequest.topics.forEach { topic =>
         if (topic.topicId != Uuid.ZERO_UUID) {
           metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
         }
@@ -1112,7 +1116,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestContext,
       DESCRIBE,
       TOPIC,
-      offsetFetchRequest.topics.asScala
+      groupFetchRequest.topics.asScala
     )(_.name)
 
     val authorizedTopics = new 
mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics]
@@ -1134,7 +1138,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       topicResponse
     }
 
-    offsetFetchRequest.topics.forEach { topic =>
+    groupFetchRequest.topics.forEach { topic =>
       if (useTopicIds && topic.name.isEmpty) {
         errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID)
       } else if (!authorizedTopicNames.contains(topic.name)) {
@@ -1147,25 +1151,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     groupCoordinator.fetchOffsets(
       requestContext,
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
-        .setGroupId(offsetFetchRequest.groupId)
-        .setMemberId(offsetFetchRequest.memberId)
-        .setMemberEpoch(offsetFetchRequest.memberEpoch)
+        .setGroupId(groupFetchRequest.groupId)
+        .setMemberId(groupFetchRequest.memberId)
+        .setMemberEpoch(groupFetchRequest.memberEpoch)
         .setTopics(authorizedTopics.asJava),
       requireStable
-    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(offsetFetchResponse, exception) =>
+    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(groupFetchResponse, exception) =>
       if (exception != null) {
-        new OffsetFetchResponseData.OffsetFetchResponseGroup()
-          .setGroupId(offsetFetchRequest.groupId)
-          .setErrorCode(Errors.forException(exception).code)
-      } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
-        offsetFetchResponse
+        OffsetFetchResponse.groupError(
+          groupFetchRequest,
+          Errors.forException(exception),
+          requestContext.apiVersion()
+        )
+      } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+        groupFetchResponse
       } else {
         val topics = new 
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
-          offsetFetchResponse.topics.size + errorTopics.size
+          groupFetchRequest.topics.size + errorTopics.size
         )
-        topics.addAll(offsetFetchResponse.topics)
+        topics.addAll(groupFetchResponse.topics)
         topics.addAll(errorTopics.asJava)
-        offsetFetchResponse.setTopics(topics)
+        groupFetchResponse.setTopics(topics)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 18f254ae40d..75bf82ef155 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -647,4 +647,53 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
       )
     }
   }
+
+  @ClusterTest
+  def testGroupErrors(): Unit = {
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.OFFSET_FETCH.oldestVersion() to 
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+      assertEquals(
+        if (version >= 2) {
+          new OffsetFetchResponseData.OffsetFetchResponseGroup()
+            .setGroupId("unknown")
+            .setErrorCode(Errors.NOT_COORDINATOR.code)
+        } else {
+          // Version 1 does not support group level errors. Hence, the error is
+          // returned at the partition level.
+          new OffsetFetchResponseData.OffsetFetchResponseGroup()
+            .setGroupId("unknown")
+            .setTopics(List(
+              new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(List(
+                  new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                    .setPartitionIndex(0)
+                    .setErrorCode(Errors.NOT_COORDINATOR.code)
+                    .setCommittedOffset(-1)
+                    .setCommittedLeaderEpoch(-1)
+                    .setMetadata("")
+                ).asJava)
+            ).asJava)
+        },
+        fetchOffsets(
+          group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId("unknown")
+            .setMemberId("")
+            .setMemberEpoch(0)
+            .setTopics(List(
+              new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setTopicId(topicId)
+                .setPartitionIndexes(List[Integer](0).asJava)
+            ).asJava),
+          requireStable = false,
+          version = version.toShort
+        )
+      )
+    }
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index ab7ede49cfe..099201ecabb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -72,6 +72,7 @@ import 
org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
@@ -1551,18 +1552,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         boolean requireStable
     ) {
         if (!isActive.get()) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.COORDINATOR_NOT_AVAILABLE,
+                context.requestVersion()
+            ));
         }
 
         // For backwards compatibility, we support fetch commits for the empty 
group id.
         if (request.groupId() == null) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.INVALID_GROUP_ID.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.INVALID_GROUP_ID,
+                context.requestVersion()
+            ));
         }
 
         // The require stable flag when set tells the broker to hold on 
returning unstable
@@ -1584,6 +1587,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 )
             ).exceptionally(exception -> handleOffsetFetchException(
                 "fetch-offsets",
+                context,
                 request,
                 exception
             ));
@@ -1606,18 +1610,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         boolean requireStable
     ) {
         if (!isActive.get()) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.COORDINATOR_NOT_AVAILABLE,
+                context.requestVersion()
+            ));
         }
 
         // For backwards compatibility, we support fetch commits for the empty 
group id.
         if (request.groupId() == null) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.INVALID_GROUP_ID.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.INVALID_GROUP_ID,
+                context.requestVersion()
+            ));
         }
 
         // The require stable flag when set tells the broker to hold on 
returning unstable
@@ -1639,6 +1645,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 )
             ).exceptionally(exception -> handleOffsetFetchException(
                 "fetch-all-offsets",
+                context,
                 request,
                 exception
             ));
@@ -2266,12 +2273,14 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
      * The handler also handles and logs unexpected errors.
      *
      * @param operationName     The name of the operation.
+     * @param context           The request context.
      * @param request           The OffsetFetchRequestGroup request.
      * @param exception         The exception to handle.
      * @return The OffsetFetchRequestGroup response.
      */
     private OffsetFetchResponseData.OffsetFetchResponseGroup 
handleOffsetFetchException(
         String operationName,
+        AuthorizableRequestContext context,
         OffsetFetchRequestData.OffsetFetchRequestGroup request,
         Throwable exception
     ) {
@@ -2290,18 +2299,22 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 // NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to 
COORDINATOR_NOT_AVAILABLE,
                 // COORDINATOR_NOT_AVAILABLE is also not handled by consumers 
on versions prior to
                 // 3.9.
-                return new OffsetFetchResponseData.OffsetFetchResponseGroup()
-                    .setGroupId(request.groupId())
-                    .setErrorCode(Errors.NOT_COORDINATOR.code());
+                return OffsetFetchResponse.groupError(
+                    request,
+                    Errors.NOT_COORDINATOR,
+                    context.requestVersion()
+                );
 
             default:
                 return handleOperationException(
                     operationName,
                     request,
                     exception,
-                    (error, __) -> new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                        .setGroupId(request.groupId())
-                        .setErrorCode(error.code()),
+                    (error, __) -> OffsetFetchResponse.groupError(
+                        request,
+                        error,
+                        context.requestVersion()
+                    ),
                     log
                 );
         }

Reply via email to