This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 68b7031dc44 KAFKA-14499: [4/N] Implement OffsetFetch API (#14120)
68b7031dc44 is described below

commit 68b7031dc443b6f6b5dfac81316ab22fe250ec54
Author: David Jacot <[email protected]>
AuthorDate: Mon Aug 28 07:02:56 2023 -0700

    KAFKA-14499: [4/N] Implement OffsetFetch API (#14120)
    
    This patch implements the OffsetFetch API in the new group coordinator.
    
    I found out that implementing the `RequireStable` flag is hard (to not say 
impossible) in the current model. For the context, the flag is here to ensure 
that an OffsetRequest request does not return stale offsets if there are 
pending offsets to be committed. In the scala code, we basically check the 
pending offsets data structure and if they are any pending offsets, we return 
the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.
    
    In our new model, we don't have the pending offsets data structure. 
Instead, we use a timeline data structure to handle all the pending/uncommitted 
changes. Because of this we don't know whether offsets are pending for a 
particular group. Instead of doing this, I propose to not return the 
`UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, 
we use a write operation to ensure that we read the latest offsets. If they are 
uncommitted offsets, the write operation  [...]
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../org/apache/kafka/coordinator/group/Group.java  |   5 +
 .../coordinator/group/GroupCoordinatorService.java |  66 +++-
 .../coordinator/group/GroupCoordinatorShard.java   |  37 ++
 .../coordinator/group/GroupMetadataManager.java    |  14 +-
 .../coordinator/group/OffsetMetadataManager.java   | 220 ++++++++++--
 .../coordinator/group/consumer/ConsumerGroup.java  |   8 +
 .../coordinator/group/generic/GenericGroup.java    |  11 +
 .../group/GroupCoordinatorServiceTest.java         |  98 ++++++
 .../group/OffsetMetadataManagerTest.java           | 392 ++++++++++++++++++++-
 9 files changed, 817 insertions(+), 34 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 33a2d509258..0ffc741dcdd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -66,4 +66,9 @@ public interface Group {
         String groupInstanceId,
         int generationIdOrMemberEpoch
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetFetch request.
+     */
+    void validateOffsetFetch() throws KafkaException;
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 230203796c3..d56d827ec6b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -61,6 +61,7 @@ import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
 import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
 import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
@@ -71,6 +72,7 @@ import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.Timer;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.Properties;
@@ -480,9 +482,35 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        // For backwards compatibility, we support fetch commits for the empty 
group id.
+        if (groupId == null) {
+            return 
FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+        }
+
+        // The require stable flag when set tells the broker to hold on 
returning unstable
+        // (or uncommitted) offsets. In the previous implementation of the 
group coordinator,
+        // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets 
are present. As
+        // the new implementation relies on timeline data structures, the 
coordinator does not
+        // really know whether offsets are stable or not so it is hard to 
return the same error.
+        // Instead, we use a write operation when the flag is set to guarantee 
that the fetch
+        // is based on all the available offsets and to ensure that the 
response waits until
+        // the pending offsets are committed. Otherwise, we use a read 
operation.
+        if (requireStable) {
+            return runtime.scheduleWriteOperation(
+                "fetch-offsets",
+                topicPartitionFor(groupId),
+                coordinator -> new CoordinatorResult<>(
+                    Collections.emptyList(),
+                    coordinator.fetchOffsets(groupId, topics, Long.MAX_VALUE)
+                )
+            );
+        } else {
+            return runtime.scheduleReadOperation(
+                "fetch-offsets",
+                topicPartitionFor(groupId),
+                (coordinator, offset) -> coordinator.fetchOffsets(groupId, 
topics, offset)
+            );
+        }
     }
 
     /**
@@ -498,9 +526,35 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        // For backwards compatibility, we support fetch commits for the empty 
group id.
+        if (groupId == null) {
+            return 
FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+        }
+
+        // The require stable flag when set tells the broker to hold on 
returning unstable
+        // (or uncommitted) offsets. In the previous implementation of the 
group coordinator,
+        // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets 
are present. As
+        // the new implementation relies on timeline data structures, the 
coordinator does not
+        // really know whether offsets are stable or not so it is hard to 
return the same error.
+        // Instead, we use a write operation when the flag is set to guarantee 
that the fetch
+        // is based on all the available offsets and to ensure that the 
response waits until
+        // the pending offsets are committed. Otherwise, we use a read 
operation.
+        if (requireStable) {
+            return runtime.scheduleWriteOperation(
+                "fetch-all-offsets",
+                topicPartitionFor(groupId),
+                coordinator -> new CoordinatorResult<>(
+                    Collections.emptyList(),
+                    coordinator.fetchAllOffsets(groupId, Long.MAX_VALUE)
+                )
+            );
+        } else {
+            return runtime.scheduleReadOperation(
+                "fetch-all-offsets",
+                topicPartitionFor(groupId),
+                (coordinator, offset) -> coordinator.fetchAllOffsets(groupId, 
offset)
+            );
+        }
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 13537db6977..6b4f7e08a5a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.errors.ApiException;
@@ -56,6 +58,7 @@ import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -256,6 +259,40 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         );
     }
 
+    /**
+     * Fetch offsets for a given set of partitions and a given group.
+     *
+     * @param groupId   The group id.
+     * @param topics    The topics to fetch the offsets for.
+     * @param epoch     The epoch (or offset) used to read from the
+     *                  timeline data structure.
+     *
+     * @return A List of OffsetFetchResponseTopics response.
+     */
+    public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchOffsets(
+        String groupId,
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+        long epoch
+    ) throws ApiException {
+        return offsetMetadataManager.fetchOffsets(groupId, topics, epoch);
+    }
+
+    /**
+     * Fetch all offsets for a given group.
+     *
+     * @param groupId   The group id.
+     * @param epoch     The epoch (or offset) used to read from the
+     *                  timeline data structure.
+     *
+     * @return A List of OffsetFetchResponseTopics response.
+     */
+    public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchAllOffsets(
+        String groupId,
+        long epoch
+    ) throws ApiException {
+        return offsetMetadataManager.fetchAllOffsets(groupId, epoch);
+    }
+
     /**
      * Handles a OffsetCommit request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ab37d760bc2..fce74d58561 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -401,7 +401,19 @@ public class GroupMetadataManager {
      * @return The group corresponding to the group id or throw 
GroupIdNotFoundException.
      */
     public Group group(String groupId) throws GroupIdNotFoundException {
-        Group group = groups.get(groupId);
+        Group group = groups.get(groupId, Long.MAX_VALUE);
+        if (group == null) {
+            throw new GroupIdNotFoundException(String.format("Group %s not 
found.", groupId));
+        }
+        return group;
+    }
+
+    /**
+     * @return The group corresponding to the group id at the given committed 
offset
+     *         or throw GroupIdNotFoundException.
+     */
+    public Group group(String groupId, long committedOffset) throws 
GroupIdNotFoundException {
+        Group group = groups.get(groupId, committedOffset);
         if (group == null) {
             throw new GroupIdNotFoundException(String.format("Group %s not 
found.", groupId));
         }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 8d89f6a980a..6743317f344 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.coordinator.group;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -24,6 +23,8 @@ import 
org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
@@ -41,10 +42,12 @@ import org.apache.kafka.timeline.TimelineHashMap;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.OptionalLong;
 
+import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+
 /**
  * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
  * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
@@ -145,9 +148,9 @@ public class OffsetMetadataManager {
     private final int offsetMetadataMaxSize;
 
     /**
-     * The offsets keyed by topic-partition and group id.
+     * The offsets keyed by group id, topic name and partition id.
      */
-    private final TimelineHashMap<String, TimelineHashMap<TopicPartition, 
OffsetAndMetadata>> offsetsByGroup;
+    private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
 
     OffsetMetadataManager(
         SnapshotRegistry snapshotRegistry,
@@ -221,6 +224,20 @@ public class OffsetMetadataManager {
         return group;
     }
 
+    /**
+     * Validates an OffsetCommit request.
+     *
+     * @param groupId               The group id.
+     * @param lastCommittedOffset   The last committed offsets in the timeline.
+     */
+    private void validateOffsetFetch(
+        String groupId,
+        long lastCommittedOffset
+    ) throws GroupIdNotFoundException {
+        Group group = groupMetadataManager.group(groupId, lastCommittedOffset);
+        group.validateOffsetFetch();
+    }
+
     /**
      * Computes the expiration timestamp based on the retention time provided 
in the OffsetCommit
      * request.
@@ -312,6 +329,109 @@ public class OffsetMetadataManager {
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Fetch offsets for a given Group.
+     *
+     * @param groupId               The group id.
+     * @param topics                The topics to fetch the offsets for.
+     * @param lastCommittedOffset   The last committed offsets in the timeline.
+     *
+     * @return A List of OffsetFetchResponseTopics response.
+     */
+    public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchOffsets(
+        String groupId,
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+        long lastCommittedOffset
+    ) throws ApiException {
+        boolean failAllPartitions = false;
+        try {
+            validateOffsetFetch(groupId, lastCommittedOffset);
+        } catch (GroupIdNotFoundException ex) {
+            failAllPartitions = true;
+        }
+
+        final List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
topicResponses = new ArrayList<>(topics.size());
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> groupOffsets =
+            failAllPartitions ? null : offsetsByGroup.get(groupId, 
lastCommittedOffset);
+
+        topics.forEach(topic -> {
+            final OffsetFetchResponseData.OffsetFetchResponseTopics 
topicResponse =
+                new 
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name());
+            topicResponses.add(topicResponse);
+
+            final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = 
groupOffsets == null ?
+                null : groupOffsets.get(topic.name(), lastCommittedOffset);
+
+            topic.partitionIndexes().forEach(partitionIndex -> {
+                final OffsetAndMetadata offsetAndMetadata = topicOffsets == 
null ?
+                    null : topicOffsets.get(partitionIndex, 
lastCommittedOffset);
+
+                if (offsetAndMetadata == null) {
+                    topicResponse.partitions().add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                        .setPartitionIndex(partitionIndex)
+                        .setCommittedOffset(INVALID_OFFSET)
+                        .setCommittedLeaderEpoch(-1)
+                        .setMetadata(""));
+                } else {
+                    topicResponse.partitions().add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                        .setPartitionIndex(partitionIndex)
+                        .setCommittedOffset(offsetAndMetadata.offset)
+                        
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
+                        .setMetadata(offsetAndMetadata.metadata));
+                }
+            });
+        });
+
+        return topicResponses;
+    }
+
+    /**
+     * Fetch all offsets for a given Group.
+     *
+     * @param groupId               The group id.
+     * @param lastCommittedOffset   The last committed offsets in the timeline.
+     *
+     * @return A List of OffsetFetchResponseTopics response.
+     */
+    public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchAllOffsets(
+        String groupId,
+        long lastCommittedOffset
+    ) throws ApiException {
+        try {
+            validateOffsetFetch(groupId, lastCommittedOffset);
+        } catch (GroupIdNotFoundException ex) {
+            return Collections.emptyList();
+        }
+
+        final List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
topicResponses = new ArrayList<>();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> groupOffsets =
+            offsetsByGroup.get(groupId, lastCommittedOffset);
+
+        if (groupOffsets != null) {
+            groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
+                final String topic = topicEntry.getKey();
+                final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets 
= topicEntry.getValue();
+
+                final OffsetFetchResponseData.OffsetFetchResponseTopics 
topicResponse =
+                    new 
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
+                topicResponses.add(topicResponse);
+
+                
topicOffsets.entrySet(lastCommittedOffset).forEach(partitionEntry -> {
+                    final int partition = partitionEntry.getKey();
+                    final OffsetAndMetadata offsetAndMetadata = 
partitionEntry.getValue();
+
+                    topicResponse.partitions().add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                        .setPartitionIndex(partition)
+                        .setCommittedOffset(offsetAndMetadata.offset)
+                        
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
+                        .setMetadata(offsetAndMetadata.metadata));
+                });
+            });
+        }
+
+        return topicResponses;
+    }
+
     /**
      * Replays OffsetCommitKey/Value to update or delete the corresponding 
offsets.
      *
@@ -323,7 +443,8 @@ public class OffsetMetadataManager {
         OffsetCommitValue value
     ) {
         final String groupId = key.group();
-        final TopicPartition tp = new TopicPartition(key.topic(), 
key.partition());
+        final String topic = key.topic();
+        final int partition = key.partition();
 
         if (value != null) {
             // The generic or consumer group should exist when offsets are 
committed or
@@ -337,22 +458,18 @@ public class OffsetMetadataManager {
                 groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, 
true);
             }
 
-            final OffsetAndMetadata offsetAndMetadata = 
OffsetAndMetadata.fromRecord(value);
-            TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = 
offsetsByGroup.get(groupId);
-            if (offsets == null) {
-                offsets = new TimelineHashMap<>(snapshotRegistry, 0);
-                offsetsByGroup.put(groupId, offsets);
-            }
-
-            offsets.put(tp, offsetAndMetadata);
+            updateOffset(
+                groupId,
+                topic,
+                partition,
+                OffsetAndMetadata.fromRecord(value)
+            );
         } else {
-            TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = 
offsetsByGroup.get(groupId);
-            if (offsets != null) {
-                offsets.remove(tp);
-                if (offsets.isEmpty()) {
-                    offsetsByGroup.remove(groupId);
-                }
-            }
+            removeOffset(
+                groupId,
+                topic,
+                partition
+            );
         }
     }
 
@@ -372,12 +489,67 @@ public class OffsetMetadataManager {
      *
      * package-private for testing.
      */
-    OffsetAndMetadata offset(String groupId, TopicPartition tp) {
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
offsetsByGroup.get(groupId);
-        if (offsets == null) {
+    OffsetAndMetadata offset(String groupId, String topic, int partition) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
topicOffsets = offsetsByGroup.get(groupId);
+        if (topicOffsets == null) {
             return null;
         } else {
-            return offsets.get(tp);
+            TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
+            if (partitionOffsets == null) {
+                return null;
+            } else {
+                return partitionOffsets.get(partition);
+            }
         }
     }
+
+    /**
+     * Updates the offset.
+     *
+     * @param groupId           The group id.
+     * @param topic             The topic name.
+     * @param partition         The partition id.
+     * @param offsetAndMetadata The offset metadata.
+     */
+    private void updateOffset(
+        String groupId,
+        String topic,
+        int partition,
+        OffsetAndMetadata offsetAndMetadata
+    ) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
topicOffsets = offsetsByGroup
+            .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+        TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets
+            .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+        partitionOffsets.put(partition, offsetAndMetadata);
+    }
+
+    /**
+     * Removes the offset.
+     *
+     * @param groupId           The group id.
+     * @param topic             The topic name.
+     * @param partition         The partition id.
+     */
+    private void removeOffset(
+        String groupId,
+        String topic,
+        int partition
+    ) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
topicOffsets = offsetsByGroup.get(groupId);
+        if (topicOffsets == null)
+            return;
+
+        TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
+        if (partitionOffsets == null)
+            return;
+
+        partitionOffsets.remove(partition);
+
+        if (partitionOffsets.isEmpty())
+            topicOffsets.remove(topic);
+
+        if (topicOffsets.isEmpty())
+            offsetsByGroup.remove(groupId);
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 82f3f19659c..f6eaed02fa3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -543,6 +543,14 @@ public class ConsumerGroup implements Group {
         }
     }
 
+    /**
+     * Validates the OffsetFetch request.
+     */
+    @Override
+    public void validateOffsetFetch() {
+        // Nothing.
+    }
+
     /**
      * Updates the current state of the group.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index d43ecda2cf7..edad170806b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.generic;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -820,6 +821,16 @@ public class GenericGroup implements Group {
         }
     }
 
+    /**
+     * Validates the OffsetFetch request.
+     */
+    @Override
+    public void validateOffsetFetch() throws GroupIdNotFoundException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
     /**
      * Verify the member id is up to date for static members. Return true if 
both conditions met:
      *   1. given member is a known static member to group
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index d1e3bea8c7a..666880c0431 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.network.ClientInformation;
@@ -56,10 +58,12 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentMatchers;
 
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.List;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -595,4 +599,98 @@ public class GroupCoordinatorServiceTest {
             future.get()
         );
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFetchOffsets(
+        boolean requireStable
+    ) throws ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        service.startup(() -> 1);
+
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topicsRequest =
+            Collections.singletonList(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(Collections.singletonList(0)));
+
+        List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse 
=
+            Collections.singletonList(new 
OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Collections.singletonList(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                    .setPartitionIndex(0)
+                    .setCommittedOffset(100L))));
+
+        if (requireStable) {
+            when(runtime.scheduleWriteOperation(
+                ArgumentMatchers.eq("fetch-offsets"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
+                ArgumentMatchers.any()
+            )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+        } else {
+            when(runtime.scheduleReadOperation(
+                ArgumentMatchers.eq("fetch-offsets"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
+                ArgumentMatchers.any()
+            )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+        }
+
+        
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> 
future = service.fetchOffsets(
+            requestContext(ApiKeys.OFFSET_FETCH),
+            "group",
+            topicsRequest,
+            requireStable
+        );
+
+        assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFetchAllOffsets(
+        boolean requireStable
+    ) throws ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        service.startup(() -> 1);
+
+        List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse 
=
+            Collections.singletonList(new 
OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Collections.singletonList(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                    .setPartitionIndex(0)
+                    .setCommittedOffset(100L))));
+
+        if (requireStable) {
+            when(runtime.scheduleWriteOperation(
+                ArgumentMatchers.eq("fetch-all-offsets"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
+                ArgumentMatchers.any()
+            )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+        } else {
+            when(runtime.scheduleReadOperation(
+                ArgumentMatchers.eq("fetch-all-offsets"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
0)),
+                ArgumentMatchers.any()
+            )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+        }
+
+        
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> 
future = service.fetchAllOffsets(
+            requestContext(ApiKeys.OFFSET_FETCH),
+            "group",
+            requireStable
+        );
+
+        assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 5fdfcc4c01e..df472cfe30a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.coordinator.group;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -27,6 +26,8 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -50,6 +51,7 @@ import 
org.apache.kafka.coordinator.group.generic.GenericGroupState;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -62,6 +64,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 
+import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -174,6 +177,28 @@ public class OffsetMetadataManagerTest {
             return result;
         }
 
+        public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchOffsets(
+            String groupId,
+            List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+            long committedOffset
+        ) {
+            return offsetMetadataManager.fetchOffsets(
+                groupId,
+                topics,
+                committedOffset
+            );
+        }
+
+        public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchAllOffsets(
+            String groupId,
+            long committedOffset
+        ) {
+            return offsetMetadataManager.fetchAllOffsets(
+                groupId,
+                committedOffset
+            );
+        }
+
         public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> 
sleep(long ms) {
             time.sleep(ms);
             List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = 
timer.poll();
@@ -185,6 +210,30 @@ public class OffsetMetadataManagerTest {
             return timeouts;
         }
 
+        public void commitOffset(
+            String groupId,
+            String topic,
+            int partition,
+            long offset,
+            int leaderEpoch
+        ) {
+            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+
+            replay(RecordHelpers.newOffsetCommitRecord(
+                groupId,
+                topic,
+                partition,
+                new OffsetAndMetadata(
+                    offset,
+                    OptionalInt.of(leaderEpoch),
+                    "metadata",
+                    time.milliseconds(),
+                    OptionalLong.empty()
+                ),
+                MetadataVersion.latest()
+            ));
+        }
+
         private ApiMessage messageOrNull(ApiMessageAndVersion 
apiMessageAndVersion) {
             if (apiMessageAndVersion == null) {
                 return null;
@@ -1040,6 +1089,342 @@ public class OffsetMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testGenericGroupFetchOffsetsWithDeadGroup() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create a dead group.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group",
+            true
+        );
+        group.transitionTo(GenericGroupState.DEAD);
+
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = 
Arrays.asList(
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(Arrays.asList(0, 1)),
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("bar")
+                .setPartitionIndexes(Collections.singletonList(0))
+        );
+
+        List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
expectedResponse = Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkInvalidOffsetPartitionResponse(0),
+                    mkInvalidOffsetPartitionResponse(1)
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Collections.singletonList(
+                    mkInvalidOffsetPartitionResponse(0)
+                ))
+        );
+
+        assertEquals(expectedResponse, context.fetchOffsets("group", request, 
Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testFetchOffsetsWithUnknownGroup() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = 
Arrays.asList(
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(Arrays.asList(0, 1)),
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("bar")
+                .setPartitionIndexes(Collections.singletonList(0))
+        );
+
+        List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
expectedResponse = Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkInvalidOffsetPartitionResponse(0),
+                    mkInvalidOffsetPartitionResponse(1)
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Collections.singletonList(
+                    mkInvalidOffsetPartitionResponse(0)
+                ))
+        );
+
+        assertEquals(expectedResponse, context.fetchOffsets("group", request, 
Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testFetchOffsetsAtDifferentCommittedOffset() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+        assertEquals(0, context.lastWrittenOffset);
+        context.commitOffset("group", "foo", 0, 100L, 1);
+        assertEquals(1, context.lastWrittenOffset);
+        context.commitOffset("group", "foo", 1, 110L, 1);
+        assertEquals(2, context.lastWrittenOffset);
+        context.commitOffset("group", "bar", 0, 200L, 1);
+        assertEquals(3, context.lastWrittenOffset);
+        context.commitOffset("group", "foo", 1, 111L, 2);
+        assertEquals(4, context.lastWrittenOffset);
+        context.commitOffset("group", "bar", 1, 210L, 2);
+        assertEquals(5, context.lastWrittenOffset);
+
+        // Always use the same request.
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = 
Arrays.asList(
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setPartitionIndexes(Arrays.asList(0, 1)),
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("bar")
+                .setPartitionIndexes(Arrays.asList(0, 1))
+        );
+
+        // Fetching with 0 should return all invalid offsets.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkInvalidOffsetPartitionResponse(0),
+                    mkInvalidOffsetPartitionResponse(1)
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkInvalidOffsetPartitionResponse(0),
+                    mkInvalidOffsetPartitionResponse(1)
+                ))
+        ), context.fetchOffsets("group", request, 0L));
+
+        // Fetching with 1 should return data up to offset 1.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkInvalidOffsetPartitionResponse(1)
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkInvalidOffsetPartitionResponse(0),
+                    mkInvalidOffsetPartitionResponse(1)
+                ))
+        ), context.fetchOffsets("group", request, 1L));
+
+        // Fetching with 2 should return data up to offset 2.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkInvalidOffsetPartitionResponse(0),
+                    mkInvalidOffsetPartitionResponse(1)
+                ))
+        ), context.fetchOffsets("group", request, 2L));
+
+        // Fetching with 3 should return data up to offset 3.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+                    mkInvalidOffsetPartitionResponse(1)
+                ))
+        ), context.fetchOffsets("group", request, 3L));
+
+        // Fetching with 4 should return data up to offset 4.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+                    mkInvalidOffsetPartitionResponse(1)
+                ))
+        ), context.fetchOffsets("group", request, 4L));
+
+        // Fetching with 5 should return data up to offset 5.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 210L, 2, "metadata")
+                ))
+        ), context.fetchOffsets("group", request, 5L));
+
+        // Fetching with Long.MAX_VALUE should return all offsets.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 210L, 2, "metadata")
+                ))
+        ), context.fetchOffsets("group", request, Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testGenericGroupFetchAllOffsetsWithDeadGroup() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create a dead group.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group",
+            true
+        );
+        group.transitionTo(GenericGroupState.DEAD);
+
+        assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 
Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testFetchAllOffsetsWithUnknownGroup() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 
Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testFetchAllOffsetsAtDifferentCommittedOffset() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+        assertEquals(0, context.lastWrittenOffset);
+        context.commitOffset("group", "foo", 0, 100L, 1);
+        assertEquals(1, context.lastWrittenOffset);
+        context.commitOffset("group", "foo", 1, 110L, 1);
+        assertEquals(2, context.lastWrittenOffset);
+        context.commitOffset("group", "bar", 0, 200L, 1);
+        assertEquals(3, context.lastWrittenOffset);
+        context.commitOffset("group", "foo", 1, 111L, 2);
+        assertEquals(4, context.lastWrittenOffset);
+        context.commitOffset("group", "bar", 1, 210L, 2);
+        assertEquals(5, context.lastWrittenOffset);
+
+        // Fetching with 0 should no offsets.
+        assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 
0L));
+
+        // Fetching with 1 should return data up to offset 1.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+                ))
+        ), context.fetchAllOffsets("group", 1L));
+
+        // Fetching with 2 should return data up to offset 2.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+                ))
+        ), context.fetchAllOffsets("group", 2L));
+
+        // Fetching with 3 should return data up to offset 3.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+                ))
+        ), context.fetchAllOffsets("group", 3L));
+
+        // Fetching with 4 should return data up to offset 4.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+                ))
+        ), context.fetchAllOffsets("group", 4L));
+
+        // Fetching with Long.MAX_VALUE should return all offsets.
+        assertEquals(Arrays.asList(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 210L, 2, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(Arrays.asList(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+                    mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+                ))
+        ), context.fetchAllOffsets("group", Long.MAX_VALUE));
+    }
+
+    static private OffsetFetchResponseData.OffsetFetchResponsePartitions 
mkOffsetPartitionResponse(
+        int partition,
+        long offset,
+        int leaderEpoch,
+        String metadata
+    ) {
+        return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(partition)
+            .setCommittedOffset(offset)
+            .setCommittedLeaderEpoch(leaderEpoch)
+            .setMetadata(metadata);
+    }
+
+    static private OffsetFetchResponseData.OffsetFetchResponsePartitions 
mkInvalidOffsetPartitionResponse(int partition) {
+        return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(partition)
+            .setCommittedOffset(INVALID_OFFSET)
+            .setCommittedLeaderEpoch(-1)
+            .setMetadata("");
+    }
+
     @Test
     public void testReplay() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
@@ -1098,7 +1483,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         // Verify that the offset is gone.
-        assertNull(context.offsetMetadataManager.offset("foo", new 
TopicPartition("bar", 0)));
+        assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
     }
 
     private void verifyReplay(
@@ -1118,7 +1503,8 @@ public class OffsetMetadataManagerTest {
 
         assertEquals(offsetAndMetadata, context.offsetMetadataManager.offset(
             groupId,
-            new TopicPartition(topic, partition)
+            topic,
+            partition
         ));
     }
 

Reply via email to