This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new be194f5dbae MINOR: Simplify OffsetFetchRequest (#19572) be194f5dbae is described below commit be194f5dbaeabf7c8992312fd0b8bcf596a1284a Author: David Jacot <david.ja...@gmail.com> AuthorDate: Sun Apr 27 18:58:30 2025 +0200 MINOR: Simplify OffsetFetchRequest (#19572) While working on https://github.com/apache/kafka/pull/19515, I came to the conclusion that the OffsetFetchRequest is quite messy and overall too complicated. This patch rationalize the constructors. OffsetFetchRequest has a single constructor accepting the OffsetFetchRequestData. This will also simplify adding the topic ids. All the changes are mechanical, replacing data structures by others. Reviewers: PoAn Yang <pay...@apache.org>, TengYao Chi <frankvi...@apache.org>, Lianet Magran <lmagr...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com> --- .../internals/ListConsumerGroupOffsetsHandler.java | 36 +- .../consumer/internals/CommitRequestManager.java | 39 +- .../consumer/internals/ConsumerCoordinator.java | 23 +- .../kafka/common/requests/OffsetFetchRequest.java | 215 +++-------- .../kafka/clients/admin/KafkaAdminClientTest.java | 6 +- .../common/requests/OffsetFetchRequestTest.java | 395 +++++++++++---------- .../kafka/common/requests/RequestResponseTest.java | 167 ++++----- .../kafka/api/AuthorizerIntegrationTest.scala | 46 ++- .../server/GroupCoordinatorBaseRequestTest.scala | 42 ++- .../scala/unit/kafka/server/KafkaApisTest.scala | 154 +++++--- .../scala/unit/kafka/server/RequestQuotaTest.scala | 14 +- 11 files changed, 607 insertions(+), 530 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 4c0e3db9254..36ff25ebb79 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; @@ -86,15 +87,32 @@ public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<Coordina } public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) { - // Create a map that only contains the consumer groups owned by the coordinator. - Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions = new HashMap<>(groupIds.size()); - groupIds.forEach(g -> { - ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue); - List<TopicPartition> partitions = spec.topicPartitions() != null ? new ArrayList<>(spec.topicPartitions()) : null; - coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions); - }); - - return new OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, false); + // Create a request that only contains the consumer groups owned by the coordinator. + return new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setRequireStable(requireStable) + .setGroups(groupIds.stream().map(groupId -> { + ListConsumerGroupOffsetsSpec spec = groupSpecs.get(groupId.idValue); + + List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = null; + if (spec.topicPartitions() != null) { + topics = spec.topicPartitions().stream() + .collect(Collectors.groupingBy(TopicPartition::topic)) + .entrySet() + .stream() + .map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(entry.getKey()) + .setPartitionIndexes(entry.getValue().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + } + return new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId.idValue) + .setTopics(topics); + }).collect(Collectors.toList())), + false + ); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 62d1fe3a866..4b22a5711b3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnstableOffsetCommitException; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -970,21 +971,37 @@ public class CommitRequestManager implements RequestManager, MemberStateListener } public NetworkClientDelegate.UnsentRequest toUnsentRequest() { + List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = requestedPartitions.stream() + .collect(Collectors.groupingBy(TopicPartition::topic)) + .entrySet() + .stream() + .map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(entry.getKey()) + .setPartitionIndexes(entry.getValue().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); - OffsetFetchRequest.Builder builder = memberInfo.memberEpoch. - map(epoch -> new OffsetFetchRequest.Builder( - groupId, - memberInfo.memberId, - epoch, - true, - new ArrayList<>(this.requestedPartitions), - throwOnFetchStableOffsetUnsupported)) + OffsetFetchRequest.Builder builder = memberInfo.memberEpoch + .map(epoch -> new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setRequireStable(true) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setMemberId(memberInfo.memberId) + .setMemberEpoch(epoch) + .setTopics(topics))), + throwOnFetchStableOffsetUnsupported)) // Building request without passing member ID/epoch to leave the logic to choose // default values when not present on the request builder. .orElseGet(() -> new OffsetFetchRequest.Builder( - groupId, - true, - new ArrayList<>(this.requestedPartitions), + new OffsetFetchRequestData() + .setRequireStable(true) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setTopics(topics))), throwOnFetchStableOffsetUnsupported)); return buildRequestWithResponseHandling(builder); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 1cba10ef15d..8829654c7a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -1477,9 +1478,27 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return RequestFuture.coordinatorNotAvailable(); log.debug("Fetching committed offsets for partitions: {}", partitions); + // construct the request - OffsetFetchRequest.Builder requestBuilder = - new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true, new ArrayList<>(partitions), throwOnFetchStableOffsetsUnsupported); + List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = partitions.stream() + .collect(Collectors.groupingBy(TopicPartition::topic)) + .entrySet() + .stream() + .map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(entry.getKey()) + .setPartitionIndexes(entry.getValue().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + + OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setRequireStable(true) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(this.rebalanceConfig.groupId) + .setTopics(topics))), + throwOnFetchStableOffsetsUnsupported); // send the request with a callback return client.send(coordinator, requestBuilder) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 907cba953fb..2a26e7940d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -22,9 +22,11 @@ import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; +import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.RecordBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,113 +36,35 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; import java.util.stream.Collectors; public class OffsetFetchRequest extends AbstractRequest { private static final Logger log = LoggerFactory.getLogger(OffsetFetchRequest.class); + private static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2; + private static final short REQUIRE_STABLE_OFFSET_MIN_VERSION = 7; + private static final short BATCH_MIN_VERSION = 8; - private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS = null; - private static final List<OffsetFetchRequestTopics> ALL_TOPIC_PARTITIONS_BATCH = null; private final OffsetFetchRequestData data; public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> { - public final OffsetFetchRequestData data; + private final OffsetFetchRequestData data; private final boolean throwOnFetchStableOffsetsUnsupported; - public Builder(String groupId, - boolean requireStable, - List<TopicPartition> partitions, - boolean throwOnFetchStableOffsetsUnsupported) { - this( - groupId, - null, - -1, - requireStable, - partitions, - throwOnFetchStableOffsetsUnsupported - ); - } - - public Builder(String groupId, - String memberId, - int memberEpoch, - boolean requireStable, - List<TopicPartition> partitions, - boolean throwOnFetchStableOffsetsUnsupported) { + public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) { super(ApiKeys.OFFSET_FETCH); - - OffsetFetchRequestData.OffsetFetchRequestGroup group = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(memberEpoch); - - if (partitions != null) { - Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap = new HashMap<>(); - for (TopicPartition topicPartition : partitions) { - String topicName = topicPartition.topic(); - OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( - topicName, new OffsetFetchRequestTopics().setName(topicName)); - topic.partitionIndexes().add(topicPartition.partition()); - offsetFetchRequestTopicMap.put(topicName, topic); - } - group.setTopics(new ArrayList<>(offsetFetchRequestTopicMap.values())); - } else { - // If passed in partition list is null, it is requesting offsets for all topic partitions. - group.setTopics(ALL_TOPIC_PARTITIONS_BATCH); - } - - this.data = new OffsetFetchRequestData() - .setRequireStable(requireStable) - .setGroups(Collections.singletonList(group)); - this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; - } - - public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap, - boolean requireStable, - boolean throwOnFetchStableOffsetsUnsupported) { - super(ApiKeys.OFFSET_FETCH); - - List<OffsetFetchRequestGroup> groups = new ArrayList<>(); - for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) { - String groupName = entry.getKey(); - List<TopicPartition> tpList = entry.getValue(); - final List<OffsetFetchRequestTopics> topics; - if (tpList != null) { - Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap = - new HashMap<>(); - for (TopicPartition topicPartition : tpList) { - String topicName = topicPartition.topic(); - OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( - topicName, new OffsetFetchRequestTopics().setName(topicName)); - topic.partitionIndexes().add(topicPartition.partition()); - offsetFetchRequestTopicMap.put(topicName, topic); - } - topics = new ArrayList<>(offsetFetchRequestTopicMap.values()); - } else { - topics = ALL_TOPIC_PARTITIONS_BATCH; - } - groups.add(new OffsetFetchRequestGroup() - .setGroupId(groupName) - .setTopics(topics)); - } - this.data = new OffsetFetchRequestData() - .setGroups(groups) - .setRequireStable(requireStable); + this.data = data; this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; } @Override public OffsetFetchRequest build(short version) { - if (data.groups().size() > 1 && version < 8) { + if (data.groups().size() > 1 && version < BATCH_MIN_VERSION) { throw new NoBatchedOffsetFetchRequestException("Broker does not support" + " batching groups for fetch offset request on version " + version); } - if (data.requireStable() && version < 7) { + if (data.requireStable() && version < REQUIRE_STABLE_OFFSET_MIN_VERSION) { if (throwOnFetchStableOffsetsUnsupported) { throw new UnsupportedVersionException("Broker unexpectedly " + "doesn't support requireStable flag on version " + version); @@ -152,7 +76,7 @@ public class OffsetFetchRequest extends AbstractRequest { } } // convert data to use the appropriate version since version 8 uses different format - if (version < 8) { + if (version < BATCH_MIN_VERSION) { OffsetFetchRequestData normalizedData; if (!data.groups().isEmpty()) { OffsetFetchRequestGroup group = data.groups().get(0); @@ -175,7 +99,7 @@ public class OffsetFetchRequest extends AbstractRequest { } else { normalizedData = data; } - if (normalizedData.topics() == null && version < 2) { + if (normalizedData.topics() == null && version < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) { throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + "v" + version + ", but we need v2 or newer to request all topic partitions."); } @@ -202,19 +126,6 @@ public class OffsetFetchRequest extends AbstractRequest { } } - public List<TopicPartition> partitions() { - if (isAllPartitions()) { - return null; - } - List<TopicPartition> partitions = new ArrayList<>(); - for (OffsetFetchRequestTopic topic : data.topics()) { - for (Integer partitionIndex : topic.partitionIndexes()) { - partitions.add(new TopicPartition(topic.name(), partitionIndex)); - } - } - return partitions; - } - public String groupId() { return data.groupId(); } @@ -224,7 +135,7 @@ public class OffsetFetchRequest extends AbstractRequest { } public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() { - if (version() >= 8) { + if (version() >= BATCH_MIN_VERSION) { return data.groups(); } else { OffsetFetchRequestData.OffsetFetchRequestGroup group = @@ -253,7 +164,7 @@ public class OffsetFetchRequest extends AbstractRequest { Map<String, List<TopicPartition>> groupIdsToPartitions = new HashMap<>(); for (OffsetFetchRequestGroup group : data.groups()) { List<TopicPartition> tpList = null; - if (group.topics() != ALL_TOPIC_PARTITIONS_BATCH) { + if (group.topics() != null) { tpList = new ArrayList<>(); for (OffsetFetchRequestTopics topic : group.topics()) { for (Integer partitionIndex : topic.partitionIndexes()) { @@ -285,67 +196,59 @@ public class OffsetFetchRequest extends AbstractRequest { this.data = data; } - public OffsetFetchResponse getErrorResponse(Errors error) { - return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, error); - } - - public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors error) { - Map<TopicPartition, OffsetFetchResponse.PartitionData> responsePartitions = new HashMap<>(); - if (version() < 2) { - OffsetFetchResponse.PartitionData partitionError = new OffsetFetchResponse.PartitionData( - OffsetFetchResponse.INVALID_OFFSET, - Optional.empty(), - OffsetFetchResponse.NO_METADATA, - error); - - for (OffsetFetchRequestTopic topic : this.data.topics()) { - for (int partitionIndex : topic.partitionIndexes()) { - responsePartitions.put( - new TopicPartition(topic.name(), partitionIndex), partitionError); - } - } - return new OffsetFetchResponse(error, responsePartitions); - } - if (version() == 2) { - return new OffsetFetchResponse(error, responsePartitions); - } - if (version() >= 3 && version() < 8) { - return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions); - } - List<String> groupIds = groupIds(); - Map<String, Errors> errorsMap = new HashMap<>(groupIds.size()); - Map<String, Map<TopicPartition, OffsetFetchResponse.PartitionData>> partitionMap = - new HashMap<>(groupIds.size()); - for (String g : groupIds) { - errorsMap.put(g, error); - partitionMap.put(g, responsePartitions); - } - return new OffsetFetchResponse(throttleTimeMs, errorsMap, partitionMap); - } - @Override public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return getErrorResponse(throttleTimeMs, Errors.forException(e)); + Errors error = Errors.forException(e); + + if (version() < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) { + // The response does not support top level error so we return each + // partition with the error. + return new OffsetFetchResponse( + new OffsetFetchResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setTopics(data.topics().stream().map(topic -> + new OffsetFetchResponseData.OffsetFetchResponseTopic() + .setName(topic.name()) + .setPartitions(topic.partitionIndexes().stream().map(partition -> + new OffsetFetchResponseData.OffsetFetchResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(error.code()) + .setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET) + .setMetadata(OffsetFetchResponse.NO_METADATA) + .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) + ).collect(Collectors.toList())) + ).collect(Collectors.toList())), + version() + ); + } else if (version() < BATCH_MIN_VERSION) { + // The response does not support multiple groups but it does support + // top level error. + return new OffsetFetchResponse( + new OffsetFetchResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(error.code()), + version() + ); + } else { + // The response does support multiple groups so we provide a top level + // error per group. + return new OffsetFetchResponse( + new OffsetFetchResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setGroups(data.groups().stream().map(group -> + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(group.groupId()) + .setErrorCode(error.code()) + ).collect(Collectors.toList())), + version() + ); + } } public static OffsetFetchRequest parse(Readable readable, short version) { return new OffsetFetchRequest(new OffsetFetchRequestData(readable, version), version); } - public boolean isAllPartitions() { - return data.topics() == ALL_TOPIC_PARTITIONS; - } - - public boolean isAllPartitionsForGroup(String groupId) { - OffsetFetchRequestGroup group = data - .groups() - .stream() - .filter(g -> g.groupId().equals(groupId)) - .collect(Collectors.toList()) - .get(0); - return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; - } - @Override public OffsetFetchRequestData data() { return data; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 0fa00dbea20..319a91d7407 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4396,7 +4396,7 @@ public class KafkaAdminClientTest { ClientRequest clientRequest = mockClient.requests().peek(); assertNotNull(clientRequest); assertEquals(300, clientRequest.requestTimeoutMs()); - OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).build().data(); assertTrue(data.requireStable()); assertEquals(Collections.singletonList(GROUP_ID), data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList())); @@ -4791,7 +4791,7 @@ public class KafkaAdminClientTest { waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); ClientRequest clientRequest = mockClient.requests().peek(); - OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).build().data(); Map<String, Map<TopicPartition, PartitionData>> results = new HashMap<>(); Map<String, Errors> errors = new HashMap<>(); data.groups().forEach(group -> { @@ -4813,7 +4813,7 @@ public class KafkaAdminClientTest { waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); ClientRequest clientRequest = mockClient.requests().peek(); - OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).build().data(); Map<String, Map<TopicPartition, PartitionData>> results = new HashMap<>(); Map<String, Errors> errors = new HashMap<>(); data.groups().forEach(group -> { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java index 1098925e42a..c4671ac1588 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java @@ -16,220 +16,239 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.OffsetFetchRequest.Builder; -import org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException; -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Optional; -import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; public class OffsetFetchRequestTest { - - private final String topicOne = "topic1"; - private final int partitionOne = 1; - private final String topicTwo = "topic2"; - private final int partitionTwo = 2; - private final String topicThree = "topic3"; - private final String group1 = "group1"; - private final String group2 = "group2"; - private final String group3 = "group3"; - private final String group4 = "group4"; - private final String group5 = "group5"; - private final List<String> groups = Arrays.asList(group1, group2, group3, group4, group5); - - private final List<Integer> listOfVersionsNonBatchOffsetFetch = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); - - - private OffsetFetchRequest.Builder builder; - - @Test - public void testConstructor() { - List<TopicPartition> partitions = Arrays.asList( - new TopicPartition(topicOne, partitionOne), - new TopicPartition(topicTwo, partitionTwo)); - int throttleTimeMs = 10; - - Map<TopicPartition, PartitionData> expectedData = new HashMap<>(); - for (TopicPartition partition : partitions) { - expectedData.put(partition, new PartitionData( - OffsetFetchResponse.INVALID_OFFSET, - Optional.empty(), - OffsetFetchResponse.NO_METADATA, - Errors.NONE + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testWithMultipleGroups(short version) { + var data = new OffsetFetchRequestData() + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp2") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List.of(0, 1, 2)) + )) )); + var builder = new OffsetFetchRequest.Builder(data, false); + + if (version < 8) { + assertThrows(OffsetFetchRequest.NoBatchedOffsetFetchRequestException.class, () -> builder.build(version)); + } else { + assertEquals(data, builder.build(version).data()); + } + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testThrowOnFetchStableOffsetsUnsupported(short version) { + var builder = new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setRequireStable(true) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )) + )), + true + ); + + if (version < 7) { + assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); + } else { + builder.build(version); } + } - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version < 8) { - builder = new OffsetFetchRequest.Builder( - group1, - false, - partitions, - false); - OffsetFetchRequest request = builder.build(version); - assertFalse(request.isAllPartitions()); - assertEquals(group1, request.groupId()); - assertEquals(partitions, request.partitions()); - - OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); - assertEquals(Errors.NONE, response.error()); - assertFalse(response.hasError()); - assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), - "Incorrect error count for version " + version); - - if (version <= 1) { - assertEquals(expectedData, response.responseDataV0ToV7()); - } - - if (version >= 3) { - assertEquals(throttleTimeMs, response.throttleTimeMs()); - } else { - assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); - } - } else { - builder = new Builder(Collections.singletonMap(group1, partitions), false, false); - OffsetFetchRequest request = builder.build(version); - Map<String, List<TopicPartition>> groupToPartitionMap = - request.groupIdsToPartitions(); - Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = - request.groupIdsToTopics(); - assertFalse(request.isAllPartitionsForGroup(group1)); - assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( - group1)); - assertEquals(partitions, groupToPartitionMap.get(group1)); - OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); - assertEquals(Errors.NONE, response.groupLevelError(group1)); - assertFalse(response.groupHasError(group1)); - assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts(), - "Incorrect error count for version " + version); - assertEquals(throttleTimeMs, response.throttleTimeMs()); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testSingleGroup(short version) { + var data = new OffsetFetchRequestData() + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )) + )); + var builder = new OffsetFetchRequest.Builder(data, false); + + if (version < 8) { + var expectedRequest = new OffsetFetchRequestData() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopic() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )); + assertEquals(expectedRequest, builder.build(version).data()); + } else { + assertEquals(data, builder.build(version).data()); } } - @Test - public void testConstructorWithMultipleGroups() { - List<TopicPartition> topic1Partitions = Arrays.asList( - new TopicPartition(topicOne, partitionOne), - new TopicPartition(topicOne, partitionTwo)); - List<TopicPartition> topic2Partitions = Arrays.asList( - new TopicPartition(topicTwo, partitionOne), - new TopicPartition(topicTwo, partitionTwo)); - List<TopicPartition> topic3Partitions = Arrays.asList( - new TopicPartition(topicThree, partitionOne), - new TopicPartition(topicThree, partitionTwo)); - Map<String, List<TopicPartition>> groupToTp = new HashMap<>(); - groupToTp.put(group1, topic1Partitions); - groupToTp.put(group2, topic2Partitions); - groupToTp.put(group3, topic3Partitions); - groupToTp.put(group4, null); - groupToTp.put(group5, null); - int throttleTimeMs = 10; - - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version >= 8) { - builder = new Builder(groupToTp, false, false); - OffsetFetchRequest request = builder.build(version); - Map<String, List<TopicPartition>> groupToPartitionMap = - request.groupIdsToPartitions(); - Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = - request.groupIdsToTopics(); - assertEquals(groupToTp.keySet(), groupToTopicMap.keySet()); - assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet()); - assertFalse(request.isAllPartitionsForGroup(group1)); - assertFalse(request.isAllPartitionsForGroup(group2)); - assertFalse(request.isAllPartitionsForGroup(group3)); - assertTrue(request.isAllPartitionsForGroup(group4)); - assertTrue(request.isAllPartitionsForGroup(group5)); - OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); - for (String group : groups) { - assertEquals(Errors.NONE, response.groupLevelError(group)); - assertFalse(response.groupHasError(group)); - } - assertEquals(Collections.singletonMap(Errors.NONE, 5), response.errorCounts(), - "Incorrect error count for version " + version); - assertEquals(throttleTimeMs, response.throttleTimeMs()); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testSingleGroupWithAllTopics(short version) { + var data = new OffsetFetchRequestData() + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(null) + )); + var builder = new OffsetFetchRequest.Builder(data, false); + + if (version < 2) { + assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); + } else if (version < 8) { + var expectedRequest = new OffsetFetchRequestData() + .setGroupId("grp1") + .setTopics(null); + assertEquals(expectedRequest, builder.build(version).data()); + } else { + assertEquals(data, builder.build(version).data()); } } - @Test - public void testBuildThrowForUnsupportedBatchRequest() { - for (int version : listOfVersionsNonBatchOffsetFetch) { - Map<String, List<TopicPartition>> groupPartitionMap = new HashMap<>(); - groupPartitionMap.put(group1, null); - groupPartitionMap.put(group2, null); - builder = new Builder(groupPartitionMap, true, false); - final short finalVersion = (short) version; - assertThrows(NoBatchedOffsetFetchRequestException.class, () -> builder.build(finalVersion)); + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testGetErrorResponse(short version) { + var request = new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1)) + )) + )), + false + ).build(version); + + if (version < 2) { + var expectedResponse = new OffsetFetchResponseData() + .setThrottleTimeMs(1000) + .setTopics(List.of( + new OffsetFetchResponseData.OffsetFetchResponseTopic() + .setName("foo") + .setPartitions(List.of( + new OffsetFetchResponseData.OffsetFetchResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET) + .setMetadata(OffsetFetchResponse.NO_METADATA) + .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH), + new OffsetFetchResponseData.OffsetFetchResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET) + .setMetadata(OffsetFetchResponse.NO_METADATA) + .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) + )) + )); + assertEquals(expectedResponse, request.getErrorResponse(1000, Errors.INVALID_GROUP_ID.exception()).data()); + } else if (version < 8) { + var expectedResponse = new OffsetFetchResponseData() + .setThrottleTimeMs(1000) + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + assertEquals(expectedResponse, request.getErrorResponse(1000, Errors.INVALID_GROUP_ID.exception()).data()); + } else { + var expectedResponse = new OffsetFetchResponseData() + .setThrottleTimeMs(1000) + .setGroups(List.of( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp1") + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + )); + assertEquals(expectedResponse, request.getErrorResponse(1000, Errors.INVALID_GROUP_ID.exception()).data()); } } - @Test - public void testConstructorFailForUnsupportedRequireStable() { - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version < 8) { - // The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped. - builder = new OffsetFetchRequest.Builder(group1, true, null, false); - final short finalVersion = version; - if (version < 2) { - assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); - } else { - OffsetFetchRequest request = builder.build(finalVersion); - assertEquals(group1, request.groupId()); - assertNull(request.partitions()); - assertTrue(request.isAllPartitions()); - if (version < 7) { - assertFalse(request.requireStable()); - } else { - assertTrue(request.requireStable()); - } - } - } else { - builder = new Builder(Collections.singletonMap(group1, null), true, false); - OffsetFetchRequest request = builder.build(version); - Map<String, List<TopicPartition>> groupToPartitionMap = - request.groupIdsToPartitions(); - Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = - request.groupIdsToTopics(); - assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( - group1)); - assertNull(groupToPartitionMap.get(group1)); - assertTrue(request.isAllPartitionsForGroup(group1)); - assertTrue(request.requireStable()); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testGroups(short version) { + var request = new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )) + )), + false + ).build(version); + + if (version < 8) { + var expectedGroups = List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )) + ); + assertEquals(expectedGroups, request.groups()); + } else { + assertEquals(request.data().groups(), request.groups()); } } - @Test - public void testBuildThrowForUnsupportedRequireStable() { - for (int version : listOfVersionsNonBatchOffsetFetch) { - builder = new OffsetFetchRequest.Builder(group1, true, null, true); - if (version < 7) { - final short finalVersion = (short) version; - assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); - } else { - OffsetFetchRequest request = builder.build((short) version); - assertTrue(request.requireStable()); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2) + public void testGroupsWithAllTopics(short version) { + var request = new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(null) + )), + false + ).build(version); + + if (version < 8) { + var expectedGroups = List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp1") + .setTopics(null) + ); + assertEquals(expectedGroups, request.groups()); + } else { + assertEquals(request.data().groups(), request.groups()); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index ce379f512f0..4a1b2d7f9b9 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -199,6 +199,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; +import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic; import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection; @@ -276,8 +277,6 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -772,55 +771,6 @@ public class RequestResponseTest { assertThrows(IllegalArgumentException.class, () -> createTopicsRequest.serializeWithHeader(requestHeader)); } - @Test - public void testOffsetFetchRequestBuilderToStringV0ToV7() { - List<Boolean> stableFlags = asList(true, false); - for (Boolean requireStable : stableFlags) { - String allTopicPartitionsString = new OffsetFetchRequest.Builder( - "someGroup", - requireStable, - null, - false - ).toString(); - - assertTrue(allTopicPartitionsString.contains("groupId='', topics=[]," - + " groups=[OffsetFetchRequestGroup(groupId='someGroup', memberId=null, memberEpoch=-1, topics=null)], requireStable=" + requireStable)); - String string = new OffsetFetchRequest.Builder( - "group1", - requireStable, - singletonList( - new TopicPartition("test11", 1)), - false - ).toString(); - assertTrue(string.contains("test11")); - assertTrue(string.contains("group1")); - assertTrue(string.contains("requireStable=" + requireStable)); - } - } - - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testOffsetFetchRequestBuilderToStringV8AndAbove(boolean requireStable) { - String allTopicPartitionsString = new OffsetFetchRequest.Builder( - Collections.singletonMap("someGroup", null), - requireStable, - false - ).toString(); - assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup" - + "(groupId='someGroup', memberId=null, memberEpoch=-1, topics=null)], requireStable=" + requireStable)); - - String subsetTopicPartitionsString = new OffsetFetchRequest.Builder( - Collections.singletonMap( - "group1", - singletonList(new TopicPartition("test11", 1))), - requireStable, - false - ).toString(); - assertTrue(subsetTopicPartitionsString.contains("test11")); - assertTrue(subsetTopicPartitionsString.contains("group1")); - assertTrue(subsetTopicPartitionsString.contains("requireStable=" + requireStable)); - } - @Test public void testApiVersionsRequestBeforeV3Validation() { for (short version = 0; version < 3; version++) { @@ -2446,66 +2396,83 @@ public class RequestResponseTest { } private OffsetFetchRequest createOffsetFetchRequest(short version, boolean requireStable) { - if (version < 8) { - return new OffsetFetchRequest.Builder( - "group1", - requireStable, - singletonList(new TopicPartition("test11", 1)), - false) - .build(version); - } return new OffsetFetchRequest.Builder( - Collections.singletonMap( - "group1", - singletonList(new TopicPartition("test11", 1))), - requireStable, - false) - .build(version); + new OffsetFetchRequestData() + .setRequireStable(requireStable) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group1") + .setMemberId(version >= 9 ? "memberid" : null) + .setMemberEpoch(version >= 9 ? 10 : -1) + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("test11") + .setPartitionIndexes(List.of(1)) + )) + )), + false + ).build(version); } private OffsetFetchRequest createOffsetFetchRequestWithMultipleGroups(short version, boolean requireStable) { - Map<String, List<TopicPartition>> groupToPartitionMap = new HashMap<>(); - List<TopicPartition> topic1 = singletonList( - new TopicPartition("topic1", 0)); - List<TopicPartition> topic2 = asList( - new TopicPartition("topic1", 0), - new TopicPartition("topic2", 0), - new TopicPartition("topic2", 1)); - List<TopicPartition> topic3 = asList( - new TopicPartition("topic1", 0), - new TopicPartition("topic2", 0), - new TopicPartition("topic2", 1), - new TopicPartition("topic3", 0), - new TopicPartition("topic3", 1), - new TopicPartition("topic3", 2)); - groupToPartitionMap.put("group1", topic1); - groupToPartitionMap.put("group2", topic2); - groupToPartitionMap.put("group3", topic3); - groupToPartitionMap.put("group4", null); - groupToPartitionMap.put("group5", null); - return new OffsetFetchRequest.Builder( - groupToPartitionMap, - requireStable, + new OffsetFetchRequestData() + .setRequireStable(requireStable) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group1") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("topic1") + .setPartitionIndexes(List.of(0)) + )), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group2") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("topic1") + .setPartitionIndexes(List.of(0)), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("topic2") + .setPartitionIndexes(List.of(0, 1)) + )), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group3") + .setTopics(List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("topic1") + .setPartitionIndexes(List.of(0)), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("topic2") + .setPartitionIndexes(List.of(0, 1)), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("topic3") + .setPartitionIndexes(List.of(0, 1, 2)) + )), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group4") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group5") + .setTopics(null) + )), false ).build(version); } private OffsetFetchRequest createOffsetFetchRequestForAllPartition(short version, boolean requireStable) { - if (version < 8) { - return new OffsetFetchRequest.Builder( - "group1", - requireStable, - null, - false) - .build(version); - } return new OffsetFetchRequest.Builder( - Collections.singletonMap( - "group1", null), - requireStable, - false) - .build(version); + new OffsetFetchRequestData() + .setRequireStable(requireStable) + .setGroups(List.of( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group1") + .setMemberId(version >= 9 ? "memberid" : null) + .setMemberEpoch(version >= 9 ? 10 : -1) + .setTopics(null) + )), + false + ).build(version); } private OffsetFetchResponse createOffsetFetchResponse(short version) { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e9e476fcd12..5286e719784 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...] +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...] import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} @@ -318,15 +318,53 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } private def createOffsetFetchRequest: OffsetFetchRequest = { - new requests.OffsetFetchRequest.Builder(group, false, List(tp).asJava, false).build() + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setRequireStable(false) + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(group) + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(tp.topic) + .setPartitionIndexes(List[Integer](tp.partition).asJava) + ).asJava) + ).asJava), + false + ).build() } private def createOffsetFetchRequestAllPartitions: OffsetFetchRequest = { - new requests.OffsetFetchRequest.Builder(group, false, null, false).build() + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setRequireStable(false) + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(group) + .setTopics(null) + ).asJava), + false + ).build() } private def createOffsetFetchRequest(groupToPartitionMap: util.Map[String, util.List[TopicPartition]]): OffsetFetchRequest = { - new requests.OffsetFetchRequest.Builder(groupToPartitionMap, false, false).build() + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(groupToPartitionMap.asScala.map { case (groupId, partitions) => + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setTopics( + if (partitions == null) + null + else + partitions.asScala.groupBy(_.topic).map { case (topic, partitions) => + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(topic) + .setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava) + }.toList.asJava) + }.toList.asJava), + false + ).build() } private def createFindCoordinatorRequest = { diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index be96826858a..df939c29ffb 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupR import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRes [...] +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRes [...] import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...] import org.apache.kafka.common.serialization.StringSerializer @@ -333,11 +333,23 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { version: Short ): OffsetFetchResponseData.OffsetFetchResponseGroup = { val request = new OffsetFetchRequest.Builder( - groupId, - memberId, - memberEpoch, - requireStable, - if (partitions == null) null else partitions.asJava, + new OffsetFetchRequestData() + .setRequireStable(requireStable) + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(memberEpoch) + .setTopics( + if (partitions == null) + null + else + partitions.groupBy(_.topic).map { case (topic, partitions) => + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(topic) + .setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava) + }.toList.asJava) + ).asJava), false ).build(version) @@ -383,8 +395,22 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { } val request = new OffsetFetchRequest.Builder( - groups.map { case (k, v) => (k, v.asJava) }.asJava, - requireStable, + new OffsetFetchRequestData() + .setRequireStable(requireStable) + .setGroups(groups.map { case (groupId, partitions) => + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setTopics( + if (partitions == null) + null + else + partitions.groupBy(_.topic).map { case (topic, partitions) => + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(topic) + .setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava) + }.toList.asJava + ) + }.toList.asJava), false ).build(version) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 0b51fd886d8..e3ffce710a6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -8242,16 +8242,30 @@ class KafkaApisTest extends Logging { @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = { def makeRequest(version: Short): RequestChannel.Request = { - val groups = Map( - "group-1" -> List( - new TopicPartition("foo", 0), - new TopicPartition("foo", 1) - ).asJava, - "group-2" -> null, - "group-3" -> null, - "group-4" -> null, - ).asJava - buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0, 1).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-3") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-4") + .setTopics(null), + ).asJava), + false + ).build(version) + ) } if (version < 8) { @@ -8364,12 +8378,17 @@ class KafkaApisTest extends Logging { def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = { def makeRequest(version: Short): RequestChannel.Request = { buildRequest(new OffsetFetchRequest.Builder( - "group-1", - false, - List( - new TopicPartition("foo", 0), - new TopicPartition("foo", 1) - ).asJava, + new OffsetFetchRequestData() + .setRequireStable(false) + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0, 1).asJava) + ).asJava) + ).asJava), false ).build(version)) } @@ -8434,17 +8453,18 @@ class KafkaApisTest extends Logging { } @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + // Version 1 does not support fetching offsets for all topics. + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2) def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = { - // Version 0 gets offsets from Zookeeper. Version 1 does not support fetching all - // offsets request. We are not interested in testing these here. - if (version < 2) return - def makeRequest(version: Short): RequestChannel.Request = { buildRequest(new OffsetFetchRequest.Builder( - "group-1", - false, - null, // all offsets. + new OffsetFetchRequestData() + .setRequireStable(false) + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(null) // all offsets. + ).asJava), false ).build(version)) } @@ -8509,19 +8529,40 @@ class KafkaApisTest extends Logging { @Test def testHandleOffsetFetchAuthorization(): Unit = { def makeRequest(version: Short): RequestChannel.Request = { - val groups = Map( - "group-1" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava, - "group-2" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava, - "group-3" -> null, - "group-4" -> null, - ).asJava - buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-3") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-4") + .setTopics(null), + ).asJava), + false + ).build(version) + ) } val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) @@ -8662,17 +8703,34 @@ class KafkaApisTest extends Logging { @Test def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = { def makeRequest(version: Short): RequestChannel.Request = { - val groups = Map( - "group-1" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava, - "group-2" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava - ).asJava - buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava) + ).asJava), + false + ).build(version) + ) } val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 7610e466207..2f001d9baf8 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -292,7 +292,19 @@ class RequestQuotaTest extends BaseRequestTest { ) ) case ApiKeys.OFFSET_FETCH => - new OffsetFetchRequest.Builder(Map("test-group"-> List(tp).asJava).asJava, false, false) + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("test-group") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(tp.topic) + .setPartitionIndexes(List[Integer](tp.partition).asJava) + ).asJava) + ).asJava), + false + ) case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest.Builder(