This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 799183b KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping
in Fetcher (#8457)
799183b is described below
commit 799183b3f8d5e8d4c955e7d3ce654b97f8309aad
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Apr 13 17:20:01 2020 -0700
KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping in Fetcher
(#8457)
This is a follow-up to #8077. The bug exposed a testing gap in how we group
partitions. This patch adds a test case which reproduces the reported problem.
Reviewers: David Arthur <[email protected]>
---
.../kafka/clients/consumer/internals/Fetcher.java | 3 +-
.../clients/consumer/internals/FetcherTest.java | 55 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9f0ec23..0699684 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -796,7 +796,8 @@ public class Fetcher<K, V> implements Closeable {
subscriptions.setNextAllowedRetry(fetchPostitions.keySet(),
time.milliseconds() + requestTimeoutMs);
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult>
future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
+ RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult>
future =
+ offsetsForLeaderEpochClient.sendAsyncRequest(node,
fetchPostitions);
future.addListener(new
RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
@Override
public void
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0156d30..041b819 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
@@ -3556,6 +3557,60 @@ public class FetcherTest {
}
@Test
+ public void testOffsetValidationRequestGrouping() {
+ buildFetcher();
+ assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3));
+
+
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy",
3,
+ Collections.emptyMap(), singletonMap(topicName, 4),
+ tp -> 5), false, 0L);
+
+ for (TopicPartition tp : subscriptions.assignedPartitions()) {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(
+ metadata.currentLeader(tp).leader, Optional.of(4));
+ subscriptions.seekUnvalidated(tp,
+ new SubscriptionState.FetchPosition(0, Optional.of(4),
leaderAndEpoch));
+ }
+
+ Set<TopicPartition> allRequestedPartitions = new HashSet<>();
+
+ for (Node node : metadata.fetch().nodes()) {
+ apiVersions.update(node.idString(), NodeApiVersions.create());
+
+ Set<TopicPartition> expectedPartitions =
subscriptions.assignedPartitions().stream()
+ .filter(tp ->
+
metadata.currentLeader(tp).leader.equals(Optional.of(node)))
+ .collect(Collectors.toSet());
+
+
assertTrue(expectedPartitions.stream().noneMatch(allRequestedPartitions::contains));
+ assertTrue(expectedPartitions.size() > 0);
+ allRequestedPartitions.addAll(expectedPartitions);
+
+ Map<TopicPartition, EpochEndOffset> endOffsets =
expectedPartitions.stream().collect(Collectors.toMap(
+ Function.identity(),
+ tp -> new EpochEndOffset(Errors.NONE, 4, 0)
+ ));
+
+ OffsetsForLeaderEpochResponse response = new
OffsetsForLeaderEpochResponse(endOffsets);
+ client.prepareResponseFrom(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ OffsetsForLeaderEpochRequest request =
(OffsetsForLeaderEpochRequest) body;
+ return
expectedPartitions.equals(request.epochsByTopicPartition().keySet());
+ }
+ }, response, node);
+ }
+
+ assertEquals(subscriptions.assignedPartitions(),
allRequestedPartitions);
+
+ fetcher.validateOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ assertTrue(subscriptions.assignedPartitions()
+ .stream().noneMatch(subscriptions::awaitingValidation));
+ }
+
+ @Test
public void testOffsetValidationAwaitsNodeApiVersion() {
buildFetcher();
assignFromUser(singleton(tp0));