This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 264e2af69b69f60a5efcbd6147a98b0f60f42ffe Author: Jason Gustafson <[email protected]> AuthorDate: Mon Apr 13 17:20:01 2020 -0700 KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping in Fetcher (#8457) This is a follow-up to #8077. The bug exposed a testing gap in how we group partitions. This patch adds a test case which reproduces the reported problem. Reviewers: David Arthur <[email protected]> --- .../kafka/clients/consumer/internals/Fetcher.java | 3 +- .../clients/consumer/internals/FetcherTest.java | 55 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index b359a24..bcfedcd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -796,7 +796,8 @@ public class Fetcher<K, V> implements Closeable { subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs); - RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions); + RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = + offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions); future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() { @Override public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 72e1370..5a88750 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -66,6 +66,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.requests.FetchRequest; @@ -3557,6 +3558,60 @@ public class FetcherTest { } @Test + public void testOffsetValidationRequestGrouping() { + buildFetcher(); + assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3)); + + metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 3, + Collections.emptyMap(), singletonMap(topicName, 4), + tp -> 5), false, 0L); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch( + metadata.currentLeader(tp).leader, Optional.of(4)); + subscriptions.seekUnvalidated(tp, + new SubscriptionState.FetchPosition(0, Optional.of(4), leaderAndEpoch)); + } + + Set<TopicPartition> allRequestedPartitions = new HashSet<>(); + + for (Node node : metadata.fetch().nodes()) { + apiVersions.update(node.idString(), NodeApiVersions.create()); + + Set<TopicPartition> expectedPartitions = subscriptions.assignedPartitions().stream() + .filter(tp -> + metadata.currentLeader(tp).leader.equals(Optional.of(node))) + .collect(Collectors.toSet()); + + assertTrue(expectedPartitions.stream().noneMatch(allRequestedPartitions::contains)); + assertTrue(expectedPartitions.size() > 0); + allRequestedPartitions.addAll(expectedPartitions); + + Map<TopicPartition, EpochEndOffset> endOffsets = expectedPartitions.stream().collect(Collectors.toMap( + Function.identity(), + tp -> new EpochEndOffset(Errors.NONE, 4, 0) + )); + + OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(endOffsets); + client.prepareResponseFrom(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest) body; + return expectedPartitions.equals(request.epochsByTopicPartition().keySet()); + } + }, response, node); + } + + assertEquals(subscriptions.assignedPartitions(), allRequestedPartitions); + + fetcher.validateOffsetsIfNeeded(); + consumerClient.pollNoWakeup(); + + assertTrue(subscriptions.assignedPartitions() + .stream().noneMatch(subscriptions::awaitingValidation)); + } + + @Test public void testOffsetValidationAwaitsNodeApiVersion() { buildFetcher(); assignFromUser(singleton(tp0));
