jeffkbkim commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067611094
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -164,5 +166,20 @@
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetch
String groupId,
boolean requireStable
);
+
+ /**
+ * Commit offsets for a given Group.
Review Comment:
are the descriptions for `fetchOffsets(), fetchAllOffsets(),
commitOffsets()` "Group" instead of "Generic Group" since they can apply to
both Generic and Consumer groups? just noticed the difference in
join/sync/leave group.
##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
public boolean shouldClientThrottle(short version) {
return version >= 4;
}
+
+ public static class Builder {
+ OffsetCommitResponseData data = new OffsetCommitResponseData();
+ HashMap<String, OffsetCommitResponseTopic> byTopicName = new
HashMap<>();
+
+ private OffsetCommitResponseTopic getOrCreateTopic(
+ String topicName
+ ) {
+ OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+ if (topic == null) {
+ topic = new OffsetCommitResponseTopic().setName(topicName);
+ data.topics().add(topic);
+ byTopicName.put(topicName, topic);
+ }
+ return topic;
+ }
+
+ public Builder addPartition(
+ String topicName,
+ int partitionIndex,
+ Errors error
+ ) {
+ final OffsetCommitResponseTopic topicResponse =
getOrCreateTopic(topicName);
+
+ topicResponse.partitions().add(new OffsetCommitResponsePartition()
+ .setPartitionIndex(partitionIndex)
+ .setErrorCode(error.code()));
+
+ return this;
+ }
+
+ public <P> Builder addPartitions(
+ String topicName,
+ List<P> partitions,
+ Function<P, Integer> partitionIndex,
+ Errors error
+ ) {
+ final OffsetCommitResponseTopic topicResponse =
getOrCreateTopic(topicName);
+
+ partitions.forEach(partition -> {
+ topicResponse.partitions().add(new
OffsetCommitResponsePartition()
+ .setPartitionIndex(partitionIndex.apply(partition))
+ .setErrorCode(error.code()));
+ });
+
+ return this;
+ }
+
+ public Builder merge(
+ OffsetCommitResponseData newData
+ ) {
+ if (data.topics().isEmpty()) {
+ // If the current data is empty, we can discard it and use the
new data.
+ data = newData;
+ } else {
+ // Otherwise, we have to merge them together.
+ newData.topics().forEach(newTopic -> {
+ OffsetCommitResponseTopic existingTopic =
byTopicName.get(newTopic.name());
+ if (existingTopic == null) {
+ // If no topic exists, we can directly copy the new
topic data.
+ data.topics().add(newTopic);
+ byTopicName.put(newTopic.name(), newTopic);
+ } else {
+ // Otherwise, we add the partitions to the existing
one.
+
existingTopic.partitions().addAll(newTopic.partitions());
Review Comment:
that makes sense. @jolshan that seems to be the case, though since a failed
partition is added first the non-error state may overwrite the error when the
consumer parses the response.
also a +1 on leaving a small note that the code assumes no overlap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]