This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d27d90ccb3b MINOR: Refactor OffsetFetch path (#21009)
d27d90ccb3b is described below

commit d27d90ccb3b2b98e02de42afd50910fbbbc162d0
Author: David Jacot <[email protected]>
AuthorDate: Thu Nov 27 19:25:10 2025 +0100

    MINOR: Refactor OffsetFetch path (#21009)
    
    The `GroupCoordinator` interface has two methods to fetch offsets:
    `fetchOffsets` and `fetchAllOffsets`. They have the exact same signature
    and the implementation in `GroupCoordinatorService` is exactly the same,
    modulo the name of the operation. The path refactors the path to
    simplify it and reuse more code. We could further refactor `KafkaApis`
    but let's do this in a follow-up in order to keep this change small and
    simple.
    
    Reviewers: Sean Quah <[email protected]>, Lianet Magrans
    <[email protected]>
---
 .../kafka/common/requests/OffsetFetchRequest.java  |  4 ++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 12 ++--
 .../kafka/coordinator/group/GroupCoordinator.java  | 15 -----
 .../coordinator/group/GroupCoordinatorService.java | 67 ++--------------------
 .../coordinator/group/GroupCoordinatorShard.java   | 23 ++------
 .../group/GroupCoordinatorServiceTest.java         | 51 ++++++++--------
 .../group/GroupCoordinatorShardTest.java           | 48 ++++++++++++++++
 8 files changed, 99 insertions(+), 125 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 83b62b5c479..7f53e81b14e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -325,6 +325,10 @@ public class OffsetFetchRequest extends AbstractRequest {
         return version >= TOPIC_ID_MIN_VERSION;
     }
 
+    public static boolean 
requestAllOffsets(OffsetFetchRequestData.OffsetFetchRequestGroup request) {
+        return request.topics() == null;
+    }
+
     @Override
     public OffsetFetchRequestData data() {
         return data;
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index affdced8cf4..b7f2e3364f0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1014,7 +1014,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val futures = new 
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
     groups.forEach { groupOffsetFetch =>
-      val isAllPartitions = groupOffsetFetch.topics == null
+      val isAllPartitions = 
OffsetFetchRequest.requestAllOffsets(groupOffsetFetch)
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
groupOffsetFetch.groupId)) {
         futures += 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
           groupOffsetFetch,
@@ -1050,7 +1050,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
     val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
 
-    groupCoordinator.fetchAllOffsets(
+    groupCoordinator.fetchOffsets(
       requestContext,
       groupFetchRequest,
       requireStable
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index dc0dbe11bdd..4d4fbe5b710 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9032,7 +9032,7 @@ class KafkaApisTest extends Logging {
       )).thenReturn(group1Future)
 
       val group2Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
-      when(groupCoordinator.fetchAllOffsets(
+      when(groupCoordinator.fetchOffsets(
         requestChannelRequest.context,
         new OffsetFetchRequestData.OffsetFetchRequestGroup()
           .setGroupId("group-2")
@@ -9041,7 +9041,7 @@ class KafkaApisTest extends Logging {
       )).thenReturn(group2Future)
 
       val group3Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
-      when(groupCoordinator.fetchAllOffsets(
+      when(groupCoordinator.fetchOffsets(
         requestChannelRequest.context,
         new OffsetFetchRequestData.OffsetFetchRequestGroup()
           .setGroupId("group-3")
@@ -9050,7 +9050,7 @@ class KafkaApisTest extends Logging {
       )).thenReturn(group3Future)
 
       val group4Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
-      when(groupCoordinator.fetchAllOffsets(
+      when(groupCoordinator.fetchOffsets(
         requestChannelRequest.context,
         new OffsetFetchRequestData.OffsetFetchRequestGroup()
           .setGroupId("group-4")
@@ -9190,7 +9190,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(group1Future)
 
     val group2Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
-    when(groupCoordinator.fetchAllOffsets(
+    when(groupCoordinator.fetchOffsets(
       requestChannelRequest.context,
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
         .setGroupId("group-2")
@@ -9384,7 +9384,7 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = makeRequest(version)
 
     val future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
-    when(groupCoordinator.fetchAllOffsets(
+    when(groupCoordinator.fetchOffsets(
       requestChannelRequest.context,
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
         .setGroupId("group-1")
@@ -9546,7 +9546,7 @@ class KafkaApisTest extends Logging {
 
     // group-3 is allowed and bar is allowed.
     val group3Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
-    when(groupCoordinator.fetchAllOffsets(
+    when(groupCoordinator.fetchOffsets(
       requestChannelRequest.context,
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
         .setGroupId("group-3")
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 83dc9622683..efdbb57a74a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -286,21 +286,6 @@ public interface GroupCoordinator {
         boolean requireStable
     );
 
-    /**
-     * Fetch all offsets for a given Group.
-     *
-     * @param context           The request context.
-     * @param request           The OffsetFetchRequestGroup request.
-     *
-     * @return  A future yielding the results.
-     *          The error codes of the results are set to indicate the errors 
occurred during the execution.
-     */
-    CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
fetchAllOffsets(
-        AuthorizableRequestContext context,
-        OffsetFetchRequestData.OffsetFetchRequestGroup request,
-        boolean requireStable
-    );
-
     /**
      * Describe the Share Group Offsets for a given group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 2817eba1798..35797024c10 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -72,6 +72,7 @@ import 
org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
@@ -1629,6 +1630,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             ));
         }
 
+        var name = OffsetFetchRequest.requestAllOffsets(request) ? 
"fetch-all-offsets" : "fetch-offsets";
+
         // The require stable flag when set tells the broker to hold on 
returning unstable
         // (or uncommitted) offsets. In the previous implementation of the 
group coordinator,
         // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets 
are present. As
@@ -1639,7 +1642,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         // the pending offsets are committed. Otherwise, we use a read 
operation.
         if (requireStable) {
             return runtime.scheduleWriteOperation(
-                "fetch-offsets",
+                name,
                 topicPartitionFor(request.groupId()),
                 Duration.ofMillis(config.offsetCommitTimeoutMs()),
                 coordinator -> new CoordinatorResult<>(
@@ -1647,78 +1650,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     coordinator.fetchOffsets(request, Long.MAX_VALUE)
                 )
             ).exceptionally(exception -> handleOffsetFetchException(
-                "fetch-offsets",
+                name,
                 context,
                 request,
                 exception
             ));
         } else {
             return runtime.scheduleReadOperation(
-                "fetch-offsets",
+                name,
                 topicPartitionFor(request.groupId()),
                 (coordinator, offset) -> coordinator.fetchOffsets(request, 
offset)
             );
         }
     }
 
-    /**
-     * See {@link GroupCoordinator#fetchAllOffsets(AuthorizableRequestContext, 
OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
-     */
-    @Override
-    public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
fetchAllOffsets(
-        AuthorizableRequestContext context,
-        OffsetFetchRequestData.OffsetFetchRequestGroup request,
-        boolean requireStable
-    ) {
-        if (!isActive.get()) {
-            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
-                request,
-                Errors.COORDINATOR_NOT_AVAILABLE,
-                context.requestVersion()
-            ));
-        }
-
-        // For backwards compatibility, we support fetch commits for the empty 
group id.
-        if (request.groupId() == null) {
-            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
-                request,
-                Errors.INVALID_GROUP_ID,
-                context.requestVersion()
-            ));
-        }
-
-        // The require stable flag when set tells the broker to hold on 
returning unstable
-        // (or uncommitted) offsets. In the previous implementation of the 
group coordinator,
-        // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets 
are present. As
-        // the new implementation relies on timeline data structures, the 
coordinator does not
-        // really know whether offsets are stable or not so it is hard to 
return the same error.
-        // Instead, we use a write operation when the flag is set to guarantee 
that the fetch
-        // is based on all the available offsets and to ensure that the 
response waits until
-        // the pending offsets are committed. Otherwise, we use a read 
operation.
-        if (requireStable) {
-            return runtime.scheduleWriteOperation(
-                "fetch-all-offsets",
-                topicPartitionFor(request.groupId()),
-                Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                coordinator -> new CoordinatorResult<>(
-                    List.of(),
-                    coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
-                )
-            ).exceptionally(exception -> handleOffsetFetchException(
-                "fetch-all-offsets",
-                context,
-                request,
-                exception
-            ));
-        } else {
-            return runtime.scheduleReadOperation(
-                "fetch-all-offsets",
-                topicPartitionFor(request.groupId()),
-                (coordinator, offset) -> coordinator.fetchAllOffsets(request, 
offset)
-            );
-        }
-    }
-
     /**
      * See {@link 
GroupCoordinator#describeShareGroupOffsets(AuthorizableRequestContext, 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index d4c1ba1d3cd..beb37b9aba0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -57,6 +57,7 @@ import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -806,23 +807,11 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         OffsetFetchRequestData.OffsetFetchRequestGroup request,
         long epoch
     ) throws ApiException {
-        return offsetMetadataManager.fetchOffsets(request, epoch);
-    }
-
-    /**
-     * Fetch all offsets for a given group.
-     *
-     * @param request   The OffsetFetchRequestGroup request.
-     * @param epoch     The epoch (or offset) used to read from the
-     *                  timeline data structure.
-     *
-     * @return A List of OffsetFetchResponseTopics response.
-     */
-    public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
-        OffsetFetchRequestData.OffsetFetchRequestGroup request,
-        long epoch
-    ) throws ApiException {
-        return offsetMetadataManager.fetchAllOffsets(request, epoch);
+        if (OffsetFetchRequest.requestAllOffsets(request)) {
+            return offsetMetadataManager.fetchAllOffsets(request, epoch);
+        } else {
+            return offsetMetadataManager.fetchOffsets(request, epoch);
+        }
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 75e0bc45c76..6dff3e701b9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -1722,11 +1722,14 @@ public class GroupCoordinatorServiceTest {
         OffsetFetchRequestData.OffsetFetchRequestGroup request =
             new OffsetFetchRequestData.OffsetFetchRequestGroup()
                 .setGroupId("group");
-        if (!fetchAllOffsets) {
-            request
-                .setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
-                    .setName("foo")
-                    .setPartitionIndexes(List.of(0))));
+
+        if (fetchAllOffsets) {
+            request.setTopics(null);
+        } else {
+            request.setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(List.of(0))
+            ));
         }
 
         OffsetFetchResponseData.OffsetFetchResponseGroup response =
@@ -1753,9 +1756,7 @@ public class GroupCoordinatorServiceTest {
             )).thenReturn(CompletableFuture.completedFuture(response));
         }
 
-        TriFunction<RequestContext, 
OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, 
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> 
fetchOffsets =
-            fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
-        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = fetchOffsets.apply(
+        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = service.fetchOffsets(
             requestContext(ApiKeys.OFFSET_FETCH),
             request,
             requireStable
@@ -1784,16 +1785,17 @@ public class GroupCoordinatorServiceTest {
         OffsetFetchRequestData.OffsetFetchRequestGroup request =
             new OffsetFetchRequestData.OffsetFetchRequestGroup()
                 .setGroupId("group");
-        if (!fetchAllOffsets) {
-            request
-                .setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
-                    .setName("foo")
-                    .setPartitionIndexes(List.of(0))));
+
+        if (fetchAllOffsets) {
+            request.setTopics(null);
+        } else {
+            request.setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(List.of(0))
+            ));
         }
 
-        TriFunction<RequestContext, 
OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, 
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> 
fetchOffsets =
-            fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
-        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = fetchOffsets.apply(
+        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = service.fetchOffsets(
             requestContext(ApiKeys.OFFSET_FETCH),
             request,
             requireStable
@@ -1834,11 +1836,14 @@ public class GroupCoordinatorServiceTest {
         OffsetFetchRequestData.OffsetFetchRequestGroup request =
             new OffsetFetchRequestData.OffsetFetchRequestGroup()
                 .setGroupId("group");
-        if (!fetchAllOffsets) {
-            request
-                .setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
-                    .setName("foo")
-                    .setPartitionIndexes(List.of(0))));
+
+        if (fetchAllOffsets) {
+            request.setTopics(null);
+        } else {
+            request.setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(List.of(0))
+            ));
         }
 
         when(runtime.scheduleWriteOperation(
@@ -1848,9 +1853,7 @@ public class GroupCoordinatorServiceTest {
             ArgumentMatchers.any()
         )).thenReturn(FutureUtils.failedFuture(new 
CompletionException(error.exception())));
 
-        TriFunction<RequestContext, 
OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, 
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> 
fetchOffsets =
-            fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
-        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = fetchOffsets.apply(
+        CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> 
future = service.fetchOffsets(
             requestContext(ApiKeys.OFFSET_FETCH),
             request,
             true
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 4dc0c8ff78c..2a6330e0d66 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -28,6 +28,8 @@ import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
@@ -99,6 +101,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
@@ -2384,4 +2387,49 @@ public class GroupCoordinatorShardTest {
         assertEquals(expectedResult, 
coordinator.completeDeleteShareGroupOffsets(groupId, topics, 
errorTopicResponseList));
         verify(groupMetadataManager, 
times(1)).completeDeleteShareGroupOffsets(any(), any(), any());
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFetchOffsets(boolean fetchAllOffsets) {
+        var offsetMetadataManager = mock(OffsetMetadataManager.class);
+        var coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            mock(GroupMetadataManager.class),
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            mock(CoordinatorMetrics.class),
+            mock(CoordinatorMetricsShard.class)
+        );
+
+        var request = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId("foo");
+
+        if (fetchAllOffsets) {
+            request.setTopics(null);
+        } else {
+            request.setTopics(List.of(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(List.of(0))
+            ));
+        }
+
+        var result = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+            .setGroupId("foo");
+
+        if (fetchAllOffsets) {
+            when(offsetMetadataManager.fetchAllOffsets(
+                request,
+                Long.MAX_VALUE
+            )).thenReturn(result);
+        } else {
+            when(offsetMetadataManager.fetchOffsets(
+                request,
+                Long.MAX_VALUE
+            )).thenReturn(result);
+        }
+
+        assertEquals(result, coordinator.fetchOffsets(request, 
Long.MAX_VALUE));
+    }
 }

Reply via email to