This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 68b7031dc44 KAFKA-14499: [4/N] Implement OffsetFetch API (#14120)
68b7031dc44 is described below
commit 68b7031dc443b6f6b5dfac81316ab22fe250ec54
Author: David Jacot <[email protected]>
AuthorDate: Mon Aug 28 07:02:56 2023 -0700
KAFKA-14499: [4/N] Implement OffsetFetch API (#14120)
This patch implements the OffsetFetch API in the new group coordinator.
I found out that implementing the `RequireStable` flag is hard (to not say
impossible) in the current model. For the context, the flag is here to ensure
that an OffsetRequest request does not return stale offsets if there are
pending offsets to be committed. In the scala code, we basically check the
pending offsets data structure and if they are any pending offsets, we return
the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.
In our new model, we don't have the pending offsets data structure.
Instead, we use a timeline data structure to handle all the pending/uncommitted
changes. Because of this we don't know whether offsets are pending for a
particular group. Instead of doing this, I propose to not return the
`UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set,
we use a write operation to ensure that we read the latest offsets. If they are
uncommitted offsets, the write operation [...]
Reviewers: Justine Olshan <[email protected]>
---
.../org/apache/kafka/coordinator/group/Group.java | 5 +
.../coordinator/group/GroupCoordinatorService.java | 66 +++-
.../coordinator/group/GroupCoordinatorShard.java | 37 ++
.../coordinator/group/GroupMetadataManager.java | 14 +-
.../coordinator/group/OffsetMetadataManager.java | 220 ++++++++++--
.../coordinator/group/consumer/ConsumerGroup.java | 8 +
.../coordinator/group/generic/GenericGroup.java | 11 +
.../group/GroupCoordinatorServiceTest.java | 98 ++++++
.../group/OffsetMetadataManagerTest.java | 392 ++++++++++++++++++++-
9 files changed, 817 insertions(+), 34 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 33a2d509258..0ffc741dcdd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -66,4 +66,9 @@ public interface Group {
String groupInstanceId,
int generationIdOrMemberEpoch
) throws KafkaException;
+
+ /**
+ * Validates the OffsetFetch request.
+ */
+ void validateOffsetFetch() throws KafkaException;
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 230203796c3..d56d827ec6b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -61,6 +61,7 @@ import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
@@ -71,6 +72,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
+import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
@@ -480,9 +482,35 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ // For backwards compatibility, we support fetch commits for the empty
group id.
+ if (groupId == null) {
+ return
FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+ }
+
+ // The require stable flag when set tells the broker to hold on
returning unstable
+ // (or uncommitted) offsets. In the previous implementation of the
group coordinator,
+ // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets
are present. As
+ // the new implementation relies on timeline data structures, the
coordinator does not
+ // really know whether offsets are stable or not so it is hard to
return the same error.
+ // Instead, we use a write operation when the flag is set to guarantee
that the fetch
+ // is based on all the available offsets and to ensure that the
response waits until
+ // the pending offsets are committed. Otherwise, we use a read
operation.
+ if (requireStable) {
+ return runtime.scheduleWriteOperation(
+ "fetch-offsets",
+ topicPartitionFor(groupId),
+ coordinator -> new CoordinatorResult<>(
+ Collections.emptyList(),
+ coordinator.fetchOffsets(groupId, topics, Long.MAX_VALUE)
+ )
+ );
+ } else {
+ return runtime.scheduleReadOperation(
+ "fetch-offsets",
+ topicPartitionFor(groupId),
+ (coordinator, offset) -> coordinator.fetchOffsets(groupId,
topics, offset)
+ );
+ }
}
/**
@@ -498,9 +526,35 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ // For backwards compatibility, we support fetch commits for the empty
group id.
+ if (groupId == null) {
+ return
FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+ }
+
+ // The require stable flag when set tells the broker to hold on
returning unstable
+ // (or uncommitted) offsets. In the previous implementation of the
group coordinator,
+ // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets
are present. As
+ // the new implementation relies on timeline data structures, the
coordinator does not
+ // really know whether offsets are stable or not so it is hard to
return the same error.
+ // Instead, we use a write operation when the flag is set to guarantee
that the fetch
+ // is based on all the available offsets and to ensure that the
response waits until
+ // the pending offsets are committed. Otherwise, we use a read
operation.
+ if (requireStable) {
+ return runtime.scheduleWriteOperation(
+ "fetch-all-offsets",
+ topicPartitionFor(groupId),
+ coordinator -> new CoordinatorResult<>(
+ Collections.emptyList(),
+ coordinator.fetchAllOffsets(groupId, Long.MAX_VALUE)
+ )
+ );
+ } else {
+ return runtime.scheduleReadOperation(
+ "fetch-all-offsets",
+ topicPartitionFor(groupId),
+ (coordinator, offset) -> coordinator.fetchAllOffsets(groupId,
offset)
+ );
+ }
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 13537db6977..6b4f7e08a5a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
@@ -56,6 +58,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@@ -256,6 +259,40 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
);
}
+ /**
+ * Fetch offsets for a given set of partitions and a given group.
+ *
+ * @param groupId The group id.
+ * @param topics The topics to fetch the offsets for.
+ * @param epoch The epoch (or offset) used to read from the
+ * timeline data structure.
+ *
+ * @return A List of OffsetFetchResponseTopics response.
+ */
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchOffsets(
+ String groupId,
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+ long epoch
+ ) throws ApiException {
+ return offsetMetadataManager.fetchOffsets(groupId, topics, epoch);
+ }
+
+ /**
+ * Fetch all offsets for a given group.
+ *
+ * @param groupId The group id.
+ * @param epoch The epoch (or offset) used to read from the
+ * timeline data structure.
+ *
+ * @return A List of OffsetFetchResponseTopics response.
+ */
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchAllOffsets(
+ String groupId,
+ long epoch
+ ) throws ApiException {
+ return offsetMetadataManager.fetchAllOffsets(groupId, epoch);
+ }
+
/**
* Handles a OffsetCommit request.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ab37d760bc2..fce74d58561 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -401,7 +401,19 @@ public class GroupMetadataManager {
* @return The group corresponding to the group id or throw
GroupIdNotFoundException.
*/
public Group group(String groupId) throws GroupIdNotFoundException {
- Group group = groups.get(groupId);
+ Group group = groups.get(groupId, Long.MAX_VALUE);
+ if (group == null) {
+ throw new GroupIdNotFoundException(String.format("Group %s not
found.", groupId));
+ }
+ return group;
+ }
+
+ /**
+ * @return The group corresponding to the group id at the given committed
offset
+ * or throw GroupIdNotFoundException.
+ */
+ public Group group(String groupId, long committedOffset) throws
GroupIdNotFoundException {
+ Group group = groups.get(groupId, committedOffset);
if (group == null) {
throw new GroupIdNotFoundException(String.format("Group %s not
found.", groupId));
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 8d89f6a980a..6743317f344 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -24,6 +23,8 @@ import
org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
@@ -41,10 +42,12 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.OptionalLong;
+import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+
/**
* The OffsetMetadataManager manages the offsets of all the groups. It
basically maintains
* a mapping from group id to topic-partition to offset. This class has two
kinds of methods:
@@ -145,9 +148,9 @@ public class OffsetMetadataManager {
private final int offsetMetadataMaxSize;
/**
- * The offsets keyed by topic-partition and group id.
+ * The offsets keyed by group id, topic name and partition id.
*/
- private final TimelineHashMap<String, TimelineHashMap<TopicPartition,
OffsetAndMetadata>> offsetsByGroup;
+ private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
OffsetMetadataManager(
SnapshotRegistry snapshotRegistry,
@@ -221,6 +224,20 @@ public class OffsetMetadataManager {
return group;
}
+ /**
+ * Validates an OffsetCommit request.
+ *
+ * @param groupId The group id.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
+ */
+ private void validateOffsetFetch(
+ String groupId,
+ long lastCommittedOffset
+ ) throws GroupIdNotFoundException {
+ Group group = groupMetadataManager.group(groupId, lastCommittedOffset);
+ group.validateOffsetFetch();
+ }
+
/**
* Computes the expiration timestamp based on the retention time provided
in the OffsetCommit
* request.
@@ -312,6 +329,109 @@ public class OffsetMetadataManager {
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Fetch offsets for a given Group.
+ *
+ * @param groupId The group id.
+ * @param topics The topics to fetch the offsets for.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
+ *
+ * @return A List of OffsetFetchResponseTopics response.
+ */
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchOffsets(
+ String groupId,
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+ long lastCommittedOffset
+ ) throws ApiException {
+ boolean failAllPartitions = false;
+ try {
+ validateOffsetFetch(groupId, lastCommittedOffset);
+ } catch (GroupIdNotFoundException ex) {
+ failAllPartitions = true;
+ }
+
+ final List<OffsetFetchResponseData.OffsetFetchResponseTopics>
topicResponses = new ArrayList<>(topics.size());
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> groupOffsets =
+ failAllPartitions ? null : offsetsByGroup.get(groupId,
lastCommittedOffset);
+
+ topics.forEach(topic -> {
+ final OffsetFetchResponseData.OffsetFetchResponseTopics
topicResponse =
+ new
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name());
+ topicResponses.add(topicResponse);
+
+ final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets =
groupOffsets == null ?
+ null : groupOffsets.get(topic.name(), lastCommittedOffset);
+
+ topic.partitionIndexes().forEach(partitionIndex -> {
+ final OffsetAndMetadata offsetAndMetadata = topicOffsets ==
null ?
+ null : topicOffsets.get(partitionIndex,
lastCommittedOffset);
+
+ if (offsetAndMetadata == null) {
+ topicResponse.partitions().add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partitionIndex)
+ .setCommittedOffset(INVALID_OFFSET)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata(""));
+ } else {
+ topicResponse.partitions().add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partitionIndex)
+ .setCommittedOffset(offsetAndMetadata.offset)
+
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
+ .setMetadata(offsetAndMetadata.metadata));
+ }
+ });
+ });
+
+ return topicResponses;
+ }
+
+ /**
+ * Fetch all offsets for a given Group.
+ *
+ * @param groupId The group id.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
+ *
+ * @return A List of OffsetFetchResponseTopics response.
+ */
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchAllOffsets(
+ String groupId,
+ long lastCommittedOffset
+ ) throws ApiException {
+ try {
+ validateOffsetFetch(groupId, lastCommittedOffset);
+ } catch (GroupIdNotFoundException ex) {
+ return Collections.emptyList();
+ }
+
+ final List<OffsetFetchResponseData.OffsetFetchResponseTopics>
topicResponses = new ArrayList<>();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> groupOffsets =
+ offsetsByGroup.get(groupId, lastCommittedOffset);
+
+ if (groupOffsets != null) {
+ groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
+ final String topic = topicEntry.getKey();
+ final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets
= topicEntry.getValue();
+
+ final OffsetFetchResponseData.OffsetFetchResponseTopics
topicResponse =
+ new
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
+ topicResponses.add(topicResponse);
+
+
topicOffsets.entrySet(lastCommittedOffset).forEach(partitionEntry -> {
+ final int partition = partitionEntry.getKey();
+ final OffsetAndMetadata offsetAndMetadata =
partitionEntry.getValue();
+
+ topicResponse.partitions().add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partition)
+ .setCommittedOffset(offsetAndMetadata.offset)
+
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
+ .setMetadata(offsetAndMetadata.metadata));
+ });
+ });
+ }
+
+ return topicResponses;
+ }
+
/**
* Replays OffsetCommitKey/Value to update or delete the corresponding
offsets.
*
@@ -323,7 +443,8 @@ public class OffsetMetadataManager {
OffsetCommitValue value
) {
final String groupId = key.group();
- final TopicPartition tp = new TopicPartition(key.topic(),
key.partition());
+ final String topic = key.topic();
+ final int partition = key.partition();
if (value != null) {
// The generic or consumer group should exist when offsets are
committed or
@@ -337,22 +458,18 @@ public class OffsetMetadataManager {
groupMetadataManager.getOrMaybeCreateGenericGroup(groupId,
true);
}
- final OffsetAndMetadata offsetAndMetadata =
OffsetAndMetadata.fromRecord(value);
- TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets =
offsetsByGroup.get(groupId);
- if (offsets == null) {
- offsets = new TimelineHashMap<>(snapshotRegistry, 0);
- offsetsByGroup.put(groupId, offsets);
- }
-
- offsets.put(tp, offsetAndMetadata);
+ updateOffset(
+ groupId,
+ topic,
+ partition,
+ OffsetAndMetadata.fromRecord(value)
+ );
} else {
- TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets =
offsetsByGroup.get(groupId);
- if (offsets != null) {
- offsets.remove(tp);
- if (offsets.isEmpty()) {
- offsetsByGroup.remove(groupId);
- }
- }
+ removeOffset(
+ groupId,
+ topic,
+ partition
+ );
}
}
@@ -372,12 +489,67 @@ public class OffsetMetadataManager {
*
* package-private for testing.
*/
- OffsetAndMetadata offset(String groupId, TopicPartition tp) {
- Map<TopicPartition, OffsetAndMetadata> offsets =
offsetsByGroup.get(groupId);
- if (offsets == null) {
+ OffsetAndMetadata offset(String groupId, String topic, int partition) {
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
topicOffsets = offsetsByGroup.get(groupId);
+ if (topicOffsets == null) {
return null;
} else {
- return offsets.get(tp);
+ TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets.get(topic);
+ if (partitionOffsets == null) {
+ return null;
+ } else {
+ return partitionOffsets.get(partition);
+ }
}
}
+
+ /**
+ * Updates the offset.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition id.
+ * @param offsetAndMetadata The offset metadata.
+ */
+ private void updateOffset(
+ String groupId,
+ String topic,
+ int partition,
+ OffsetAndMetadata offsetAndMetadata
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
topicOffsets = offsetsByGroup
+ .computeIfAbsent(groupId, __ -> new
TimelineHashMap<>(snapshotRegistry, 0));
+ TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets
+ .computeIfAbsent(topic, __ -> new
TimelineHashMap<>(snapshotRegistry, 0));
+ partitionOffsets.put(partition, offsetAndMetadata);
+ }
+
+ /**
+ * Removes the offset.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition id.
+ */
+ private void removeOffset(
+ String groupId,
+ String topic,
+ int partition
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
topicOffsets = offsetsByGroup.get(groupId);
+ if (topicOffsets == null)
+ return;
+
+ TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets.get(topic);
+ if (partitionOffsets == null)
+ return;
+
+ partitionOffsets.remove(partition);
+
+ if (partitionOffsets.isEmpty())
+ topicOffsets.remove(topic);
+
+ if (topicOffsets.isEmpty())
+ offsetsByGroup.remove(groupId);
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 82f3f19659c..f6eaed02fa3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -543,6 +543,14 @@ public class ConsumerGroup implements Group {
}
}
+ /**
+ * Validates the OffsetFetch request.
+ */
+ @Override
+ public void validateOffsetFetch() {
+ // Nothing.
+ }
+
/**
* Updates the current state of the group.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index d43ecda2cf7..edad170806b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -820,6 +821,16 @@ public class GenericGroup implements Group {
}
}
+ /**
+ * Validates the OffsetFetch request.
+ */
+ @Override
+ public void validateOffsetFetch() throws GroupIdNotFoundException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
+ }
+
/**
* Verify the member id is up to date for static members. Return true if
both conditions met:
* 1. given member is a known static member to group
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index d1e3bea8c7a..666880c0431 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
@@ -56,10 +58,12 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@@ -595,4 +599,98 @@ public class GroupCoordinatorServiceTest {
future.get()
);
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFetchOffsets(
+ boolean requireStable
+ ) throws ExecutionException, InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ service.startup(() -> 1);
+
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topicsRequest =
+ Collections.singletonList(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0)));
+
+ List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse
=
+ Collections.singletonList(new
OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L))));
+
+ if (requireStable) {
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("fetch-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+ } else {
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("fetch-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+ }
+
+
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>>
future = service.fetchOffsets(
+ requestContext(ApiKeys.OFFSET_FETCH),
+ "group",
+ topicsRequest,
+ requireStable
+ );
+
+ assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFetchAllOffsets(
+ boolean requireStable
+ ) throws ExecutionException, InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ service.startup(() -> 1);
+
+ List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse
=
+ Collections.singletonList(new
OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L))));
+
+ if (requireStable) {
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("fetch-all-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+ } else {
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("fetch-all-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(topicsResponse));
+ }
+
+
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>>
future = service.fetchAllOffsets(
+ requestContext(ApiKeys.OFFSET_FETCH),
+ "group",
+ requireStable
+ );
+
+ assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 5fdfcc4c01e..df472cfe30a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -27,6 +26,8 @@ import
org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -50,6 +51,7 @@ import
org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -62,6 +64,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
+import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -174,6 +177,28 @@ public class OffsetMetadataManagerTest {
return result;
}
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchOffsets(
+ String groupId,
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+ long committedOffset
+ ) {
+ return offsetMetadataManager.fetchOffsets(
+ groupId,
+ topics,
+ committedOffset
+ );
+ }
+
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchAllOffsets(
+ String groupId,
+ long committedOffset
+ ) {
+ return offsetMetadataManager.fetchAllOffsets(
+ groupId,
+ committedOffset
+ );
+ }
+
public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>>
sleep(long ms) {
time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts =
timer.poll();
@@ -185,6 +210,30 @@ public class OffsetMetadataManagerTest {
return timeouts;
}
+ public void commitOffset(
+ String groupId,
+ String topic,
+ int partition,
+ long offset,
+ int leaderEpoch
+ ) {
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+
+ replay(RecordHelpers.newOffsetCommitRecord(
+ groupId,
+ topic,
+ partition,
+ new OffsetAndMetadata(
+ offset,
+ OptionalInt.of(leaderEpoch),
+ "metadata",
+ time.milliseconds(),
+ OptionalLong.empty()
+ ),
+ MetadataVersion.latest()
+ ));
+ }
+
private ApiMessage messageOrNull(ApiMessageAndVersion
apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@@ -1040,6 +1089,342 @@ public class OffsetMetadataManagerTest {
);
}
+ @Test
+ public void testGenericGroupFetchOffsetsWithDeadGroup() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create a dead group.
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "group",
+ true
+ );
+ group.transitionTo(GenericGroupState.DEAD);
+
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> request =
Arrays.asList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Arrays.asList(0, 1)),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(Collections.singletonList(0))
+ );
+
+ List<OffsetFetchResponseData.OffsetFetchResponseTopics>
expectedResponse = Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkInvalidOffsetPartitionResponse(0),
+ mkInvalidOffsetPartitionResponse(1)
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ mkInvalidOffsetPartitionResponse(0)
+ ))
+ );
+
+ assertEquals(expectedResponse, context.fetchOffsets("group", request,
Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testFetchOffsetsWithUnknownGroup() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> request =
Arrays.asList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Arrays.asList(0, 1)),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(Collections.singletonList(0))
+ );
+
+ List<OffsetFetchResponseData.OffsetFetchResponseTopics>
expectedResponse = Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkInvalidOffsetPartitionResponse(0),
+ mkInvalidOffsetPartitionResponse(1)
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ mkInvalidOffsetPartitionResponse(0)
+ ))
+ );
+
+ assertEquals(expectedResponse, context.fetchOffsets("group", request,
Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testFetchOffsetsAtDifferentCommittedOffset() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
+ assertEquals(0, context.lastWrittenOffset);
+ context.commitOffset("group", "foo", 0, 100L, 1);
+ assertEquals(1, context.lastWrittenOffset);
+ context.commitOffset("group", "foo", 1, 110L, 1);
+ assertEquals(2, context.lastWrittenOffset);
+ context.commitOffset("group", "bar", 0, 200L, 1);
+ assertEquals(3, context.lastWrittenOffset);
+ context.commitOffset("group", "foo", 1, 111L, 2);
+ assertEquals(4, context.lastWrittenOffset);
+ context.commitOffset("group", "bar", 1, 210L, 2);
+ assertEquals(5, context.lastWrittenOffset);
+
+ // Always use the same request.
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> request =
Arrays.asList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Arrays.asList(0, 1)),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(Arrays.asList(0, 1))
+ );
+
+ // Fetching with 0 should return all invalid offsets.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkInvalidOffsetPartitionResponse(0),
+ mkInvalidOffsetPartitionResponse(1)
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkInvalidOffsetPartitionResponse(0),
+ mkInvalidOffsetPartitionResponse(1)
+ ))
+ ), context.fetchOffsets("group", request, 0L));
+
+ // Fetching with 1 should return data up to offset 1.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkInvalidOffsetPartitionResponse(1)
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkInvalidOffsetPartitionResponse(0),
+ mkInvalidOffsetPartitionResponse(1)
+ ))
+ ), context.fetchOffsets("group", request, 1L));
+
+ // Fetching with 2 should return data up to offset 2.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkInvalidOffsetPartitionResponse(0),
+ mkInvalidOffsetPartitionResponse(1)
+ ))
+ ), context.fetchOffsets("group", request, 2L));
+
+ // Fetching with 3 should return data up to offset 3.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+ mkInvalidOffsetPartitionResponse(1)
+ ))
+ ), context.fetchOffsets("group", request, 3L));
+
+ // Fetching with 4 should return data up to offset 4.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+ mkInvalidOffsetPartitionResponse(1)
+ ))
+ ), context.fetchOffsets("group", request, 4L));
+
+ // Fetching with 5 should return data up to offset 5.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 210L, 2, "metadata")
+ ))
+ ), context.fetchOffsets("group", request, 5L));
+
+ // Fetching with Long.MAX_VALUE should return all offsets.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 210L, 2, "metadata")
+ ))
+ ), context.fetchOffsets("group", request, Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testGenericGroupFetchAllOffsetsWithDeadGroup() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create a dead group.
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "group",
+ true
+ );
+ group.transitionTo(GenericGroupState.DEAD);
+
+ assertEquals(Collections.emptyList(), context.fetchAllOffsets("group",
Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testFetchAllOffsetsWithUnknownGroup() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ assertEquals(Collections.emptyList(), context.fetchAllOffsets("group",
Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testFetchAllOffsetsAtDifferentCommittedOffset() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
+ assertEquals(0, context.lastWrittenOffset);
+ context.commitOffset("group", "foo", 0, 100L, 1);
+ assertEquals(1, context.lastWrittenOffset);
+ context.commitOffset("group", "foo", 1, 110L, 1);
+ assertEquals(2, context.lastWrittenOffset);
+ context.commitOffset("group", "bar", 0, 200L, 1);
+ assertEquals(3, context.lastWrittenOffset);
+ context.commitOffset("group", "foo", 1, 111L, 2);
+ assertEquals(4, context.lastWrittenOffset);
+ context.commitOffset("group", "bar", 1, 210L, 2);
+ assertEquals(5, context.lastWrittenOffset);
+
+ // Fetching with 0 should no offsets.
+ assertEquals(Collections.emptyList(), context.fetchAllOffsets("group",
0L));
+
+ // Fetching with 1 should return data up to offset 1.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", 1L));
+
+ // Fetching with 2 should return data up to offset 2.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", 2L));
+
+ // Fetching with 3 should return data up to offset 3.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 110L, 1, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", 3L));
+
+ // Fetching with 4 should return data up to offset 4.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", 4L));
+
+ // Fetching with Long.MAX_VALUE should return all offsets.
+ assertEquals(Arrays.asList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 210L, 2, "metadata")
+ )),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Arrays.asList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
+ mkOffsetPartitionResponse(1, 111L, 2, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", Long.MAX_VALUE));
+ }
+
+ static private OffsetFetchResponseData.OffsetFetchResponsePartitions
mkOffsetPartitionResponse(
+ int partition,
+ long offset,
+ int leaderEpoch,
+ String metadata
+ ) {
+ return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partition)
+ .setCommittedOffset(offset)
+ .setCommittedLeaderEpoch(leaderEpoch)
+ .setMetadata(metadata);
+ }
+
+ static private OffsetFetchResponseData.OffsetFetchResponsePartitions
mkInvalidOffsetPartitionResponse(int partition) {
+ return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partition)
+ .setCommittedOffset(INVALID_OFFSET)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("");
+ }
+
@Test
public void testReplay() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
@@ -1098,7 +1483,7 @@ public class OffsetMetadataManagerTest {
));
// Verify that the offset is gone.
- assertNull(context.offsetMetadataManager.offset("foo", new
TopicPartition("bar", 0)));
+ assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
}
private void verifyReplay(
@@ -1118,7 +1503,8 @@ public class OffsetMetadataManagerTest {
assertEquals(offsetAndMetadata, context.offsetMetadataManager.offset(
groupId,
- new TopicPartition(topic, partition)
+ topic,
+ partition
));
}