This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 799183b  KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping 
in Fetcher (#8457)
799183b is described below

commit 799183b3f8d5e8d4c955e7d3ce654b97f8309aad
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Apr 13 17:20:01 2020 -0700

    KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping in Fetcher 
(#8457)
    
    This is a follow-up to #8077. The bug exposed a testing gap in how we group 
partitions. This patch adds a test case which reproduces the reported problem.
    
    Reviewers: David Arthur <[email protected]>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  3 +-
 .../clients/consumer/internals/FetcherTest.java    | 55 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9f0ec23..0699684 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -796,7 +796,8 @@ public class Fetcher<K, V> implements Closeable {
 
             subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), 
time.milliseconds() + requestTimeoutMs);
 
-            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> 
future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
+            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> 
future =
+                offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPostitions);
             future.addListener(new 
RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
                 @Override
                 public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0156d30..041b819 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -3556,6 +3557,60 @@ public class FetcherTest {
     }
 
     @Test
+    public void testOffsetValidationRequestGrouping() {
+        buildFetcher();
+        assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3));
+
+        
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 
3,
+            Collections.emptyMap(), singletonMap(topicName, 4),
+            tp -> 5), false, 0L);
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(
+                metadata.currentLeader(tp).leader, Optional.of(4));
+            subscriptions.seekUnvalidated(tp,
+                new SubscriptionState.FetchPosition(0, Optional.of(4), 
leaderAndEpoch));
+        }
+
+        Set<TopicPartition> allRequestedPartitions = new HashSet<>();
+
+        for (Node node : metadata.fetch().nodes()) {
+            apiVersions.update(node.idString(), NodeApiVersions.create());
+
+            Set<TopicPartition> expectedPartitions = 
subscriptions.assignedPartitions().stream()
+                .filter(tp ->
+                    
metadata.currentLeader(tp).leader.equals(Optional.of(node)))
+                .collect(Collectors.toSet());
+
+            
assertTrue(expectedPartitions.stream().noneMatch(allRequestedPartitions::contains));
+            assertTrue(expectedPartitions.size() > 0);
+            allRequestedPartitions.addAll(expectedPartitions);
+
+            Map<TopicPartition, EpochEndOffset> endOffsets = 
expectedPartitions.stream().collect(Collectors.toMap(
+                Function.identity(),
+                tp -> new EpochEndOffset(Errors.NONE, 4, 0)
+            ));
+
+            OffsetsForLeaderEpochResponse response = new 
OffsetsForLeaderEpochResponse(endOffsets);
+            client.prepareResponseFrom(new MockClient.RequestMatcher() {
+                @Override
+                public boolean matches(AbstractRequest body) {
+                    OffsetsForLeaderEpochRequest request = 
(OffsetsForLeaderEpochRequest) body;
+                    return 
expectedPartitions.equals(request.epochsByTopicPartition().keySet());
+                }
+            }, response, node);
+        }
+
+        assertEquals(subscriptions.assignedPartitions(), 
allRequestedPartitions);
+
+        fetcher.validateOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertTrue(subscriptions.assignedPartitions()
+            .stream().noneMatch(subscriptions::awaitingValidation));
+    }
+
+    @Test
     public void testOffsetValidationAwaitsNodeApiVersion() {
         buildFetcher();
         assignFromUser(singleton(tp0));

Reply via email to