This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7920fadbb58 Revert "KAFKA-17182: Consumer fetch sessions are evicted
too quickly with AsyncKafkaConsumer (#17700)"
7920fadbb58 is described below
commit 7920fadbb586a9430ce1a45936d6bbd1555baa2d
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Jan 31 17:18:35 2025 -0500
Revert "KAFKA-17182: Consumer fetch sessions are evicted too quickly with
AsyncKafkaConsumer (#17700)"
This reverts commit 6cf54c4dab9ff39f21d7f2c9bd241b938b5651b2.
---
.../clients/consumer/internals/AbstractFetch.java | 98 +-----
.../internals/FetchRequestManagerTest.java | 373 ++-------------------
.../clients/consumer/internals/FetcherTest.java | 49 ++-
3 files changed, 73 insertions(+), 447 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 651d43564b3..e3d4eb58af4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -44,7 +44,6 @@ import org.slf4j.helpers.MessageFormatter;
import java.io.Closeable;
import java.time.Duration;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -316,15 +315,17 @@ public abstract class AbstractFetch implements Closeable {
}
/**
- * Return the set of <em>fetchable</em> partitions, which are the set of
partitions to which we are subscribed,
+ * Return the list of <em>fetchable</em> partitions, which are the set of
partitions to which we are subscribed,
* but <em>excluding</em> any partitions for which we still have buffered
data. The idea is that since the user
* has yet to process the data for the partition that has already been
fetched, we should not go send for more data
* until the previously-fetched data has been processed.
*
- * @param buffered The set of partitions we have in our buffer
* @return {@link Set} of {@link TopicPartition topic partitions} for
which we should fetch data
*/
- private Set<TopicPartition> fetchablePartitions(Set<TopicPartition>
buffered) {
+ private Set<TopicPartition> fetchablePartitions() {
+ // This is the set of partitions we have in our buffer
+ Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();
+
// This is the test that returns true if the partition is *not*
buffered
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
@@ -407,44 +408,22 @@ public abstract class AbstractFetch implements Closeable {
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
- // This is the set of partitions that have buffered data
- Set<TopicPartition> buffered =
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
+ for (TopicPartition partition : fetchablePartitions()) {
+ SubscriptionState.FetchPosition position =
subscriptions.position(partition);
- // This is the set of partitions that do not have buffered data
- Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
+ if (position == null)
+ throw new IllegalStateException("Missing position for
fetchable partition " + partition);
- if (unbuffered.isEmpty()) {
- // If there are no partitions that don't already have data locally
buffered, there's no need to issue
- // any fetch requests at the present time.
- return Collections.emptyMap();
- }
+ Optional<Node> leaderOpt = position.currentLeader.leader;
- Set<Integer> bufferedNodes = new HashSet<>();
-
- for (TopicPartition partition : buffered) {
- // It's possible that at the time of the fetcher creating new
fetch requests, a partition with buffered
- // data from a *previous* request is no longer assigned. So before
attempting to retrieve the node
- // information, check that the partition is still assigned and
fetchable; an unassigned/invalid partition
- // will throw an IllegalStateException in positionForPartition.
- //
- // Note: this check is not needed for the unbuffered partitions as
the logic in
- // SubscriptionState.fetchablePartitions() only includes
partitions currently assigned.
- if (!subscriptions.hasValidPosition(partition))
- continue;
-
- SubscriptionState.FetchPosition position =
positionForPartition(partition);
- Optional<Node> nodeOpt = maybeNodeForPosition(partition, position,
currentTimeMs);
- nodeOpt.ifPresent(node -> bufferedNodes.add(node.id()));
- }
-
- for (TopicPartition partition : unbuffered) {
- SubscriptionState.FetchPosition position =
positionForPartition(partition);
- Optional<Node> nodeOpt = maybeNodeForPosition(partition, position,
currentTimeMs);
-
- if (nodeOpt.isEmpty())
+ if (leaderOpt.isEmpty()) {
+ log.debug("Requesting metadata update for partition {} since
the position {} is missing the current leader node", partition, position);
+ metadata.requestUpdate(false);
continue;
+ }
- Node node = nodeOpt.get();
+ // Use the preferred read replica if set, otherwise the
partition's leader
+ Node node = selectReadReplica(partition, leaderOpt.get(),
currentTimeMs);
if (isUnavailable(node)) {
maybeThrowAuthFailure(node);
@@ -453,14 +432,7 @@ public abstract class AbstractFetch implements Closeable {
// going to be failed anyway before being sent, so skip
sending the request for now
log.trace("Skipping fetch for partition {} because node {} is
awaiting reconnect backoff", partition, node);
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
- // If there's already an inflight request for this node, don't
issue another request.
log.trace("Skipping fetch for partition {} because previous
request to {} has not been processed", partition, node);
- } else if (bufferedNodes.contains(node.id())) {
- // While a node has buffered data, don't fetch other partition
data from it. Because the buffered
- // partitions are not included in the fetch request, those
partitions will be inadvertently dropped
- // from the broker fetch session cache. In some cases, that
could lead to the entire fetch session
- // being evicted.
- log.trace("Skipping fetch for partition {} because its leader
node {} hosts buffered partitions", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new
fetch
FetchSessionHandler.Builder builder =
fetchable.computeIfAbsent(node, k -> {
@@ -484,44 +456,6 @@ public abstract class AbstractFetch implements Closeable {
return
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().build()));
}
- /**
- * Simple utility method that returns a {@link
SubscriptionState.FetchPosition position} for the partition. If
- * no position exists, an {@link IllegalStateException} is thrown.
- */
- private SubscriptionState.FetchPosition
positionForPartition(TopicPartition partition) {
- SubscriptionState.FetchPosition position =
subscriptions.position(partition);
-
- if (position == null)
- throw new IllegalStateException("Missing position for fetchable
partition " + partition);
-
- return position;
- }
-
- /**
- * Retrieves the node from which to fetch the partition data. If the given
- * {@link SubscriptionState.FetchPosition position} does not have a current
- * {@link Metadata.LeaderAndEpoch#leader leader} defined the method will
return {@link Optional#empty()}.
- *
- * @return Three options: 1) {@link Optional#empty()} if the position's
leader is empty, 2) the
- * {@link #selectReadReplica(TopicPartition, Node, long) read replica, if
defined}, or 3) the position's
- * {@link Metadata.LeaderAndEpoch#leader leader}
- */
- private Optional<Node> maybeNodeForPosition(TopicPartition partition,
-
SubscriptionState.FetchPosition position,
- long currentTimeMs) {
- Optional<Node> leaderOpt = position.currentLeader.leader;
-
- if (leaderOpt.isEmpty()) {
- log.debug("Requesting metadata update for partition {} since the
position {} is missing the current leader node", partition, position);
- metadata.requestUpdate(false);
- return Optional.empty();
- }
-
- // Use the preferred read replica if set, otherwise the partition's
leader
- Node node = selectReadReplica(partition, leaderOpt.get(),
currentTimeMs);
- return Optional.of(node);
- }
-
// Visible for testing
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 0ec0633b78b..3cdc0ac4845 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
@@ -49,6 +48,9 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -73,6 +75,7 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchRequest.PartitionData;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
@@ -106,7 +109,6 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -114,13 +116,11 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -211,15 +211,11 @@ public class FetchRequestManagerTest {
}
private void assignFromUser(Set<TopicPartition> partitions) {
- assignFromUser(partitions, 1);
- }
-
- private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
subscriptions.assignFromUser(partitions);
- client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(numNodes,
singletonMap(topicName, 4), topicIds));
+ client.updateMetadata(initialUpdateResponse);
// A dummy metadata update to ensure valid leader epoch.
-
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
numNodes,
+
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
1,
Collections.emptyMap(), singletonMap(topicName, 4),
tp -> validLeaderEpoch, topicIds), false, 0L);
}
@@ -1433,7 +1429,7 @@ public class FetchRequestManagerTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords;
- assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
+ assignFromUser(Set.of(tp0, tp1));
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1465,7 +1461,7 @@ public class FetchRequestManagerTest {
public void testFetchOnCompletedFetchesForAllPausedPartitions() {
buildFetcher();
- assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
+ assignFromUser(Set.of(tp0, tp1));
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1841,9 +1837,7 @@ public class FetchRequestManagerTest {
buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), 2,
IsolationLevel.READ_UNCOMMITTED);
- // Use multiple nodes so partitions have different leaders. tp0 is
added here, but tp1 is also assigned
- // about halfway down.
- assignFromUser(Set.of(tp0), 2);
+ assignFromUser(Set.of(tp0));
subscriptions.seek(tp0, 1);
assertEquals(1, sendFetches());
Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
@@ -3443,338 +3437,21 @@ public class FetchRequestManagerTest {
}
- /**
- * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
- * modified to either cause (or prevent) a fetch from being requested.
- */
- @Test
- public void testFetchRequestWithBufferedPartitions() {
- buildFetcher();
-
- // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
- // partition-to-node mapping.
- int numNodes = 2;
- Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
- assignFromUser(partitions, numNodes);
-
- // Seek each partition so that it becomes eligible to fetch.
- partitions.forEach(tp -> subscriptions.seek(tp, 0));
-
- // Get all the nodes serving as the leader for these partitions.
- List<Node> nodes = nodesForPartitionLeaders(partitions);
-
- // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
- assertEquals(2, nodes.size());
- Node node0 = nodes.get(0);
- Node node1 = nodes.get(1);
- List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
- List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
- assertEquals(2, node0Partitions.size());
- assertEquals(2, node1Partitions.size());
- TopicPartition node0Partition1 = node0Partitions.get(0);
- TopicPartition node0Partition2 = node0Partitions.get(1);
- TopicPartition node1Partition1 = node1Partitions.get(0);
- TopicPartition node1Partition2 = node1Partitions.get(1);
-
- // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
- assertEquals(2, sendFetches());
- prepareFetchResponses(node0, node0Partitions, 0);
- prepareFetchResponses(node1, node1Partitions, 0);
- networkClientDelegate.poll(time.timer(0));
-
- assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
- collectSelectedPartition(node0Partition1, partitions);
- node0Partitions.remove(node0Partition1);
- assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
- assertEquals(0, sendFetches());
- networkClientDelegate.poll(time.timer(0));
- collectSelectedPartition(node1Partition1, partitions);
- node1Partitions.remove(node1Partition1);
- assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
- assertEquals(0, sendFetches());
- networkClientDelegate.poll(time.timer(0));
- collectSelectedPartition(node0Partition2, partitions);
- node0Partitions.remove(node0Partition2);
- assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // Validate that all of node 0's partitions have all been collected.
- assertTrue(node0Partitions.isEmpty());
-
- // Reset the list of partitions for node 0 so the next fetch pass
requests data.
- node0Partitions = partitionsForNode(node0, partitions);
-
- // sendFetches() call #4 should issue a request to node 0 since its
buffered data was collected.
- assertEquals(1, sendFetches());
- prepareFetchResponses(node0, node0Partitions, 10);
- networkClientDelegate.poll(time.timer(0));
-
- collectSelectedPartition(node1Partition2, partitions);
- node1Partitions.remove(node1Partition2);
- assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // Node 1's partitions have likewise all been collected, so validate
that.
- assertTrue(node1Partitions.isEmpty());
-
- // Again, reset the list of partitions, this time for node 1, so the
next fetch pass requests data.
- node1Partitions = partitionsForNode(node1, partitions);
-
- // sendFetches() call #5 should issue a request to node 1 since its
buffered data was collected.
- assertEquals(1, sendFetches());
- prepareFetchResponses(node1, node1Partitions, 10);
- networkClientDelegate.poll(time.timer(0));
- assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // Collect all the records and make sure they include all the
partitions, and validate that there is no data
- // remaining in the fetch buffer.
- assertEquals(partitions, fetchRecords().keySet());
- assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // sendFetches() call #6 should issue a request to nodes 0 and 1 since
its buffered data was collected.
- assertEquals(2, sendFetches());
- prepareFetchResponses(node0, node0Partitions, 20);
- prepareFetchResponses(node1, node1Partitions, 20);
- networkClientDelegate.poll(time.timer(0));
- assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // Just for completeness, collect all the records and make sure they
include all the partitions, and validate
- // that there is no data remaining in the fetch buffer.
- assertEquals(partitions, fetchRecords().keySet());
- assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
- }
-
- @Test
- public void testFetchRequestWithBufferedPartitionNotAssigned() {
- buildFetcher();
-
- // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
- // partition-to-node mapping.
- int numNodes = 2;
- Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
- assignFromUser(partitions, numNodes);
-
- // Seek each partition so that it becomes eligible to fetch.
- partitions.forEach(tp -> subscriptions.seek(tp, 0));
-
- // Get all the nodes serving as the leader for these partitions.
- List<Node> nodes = nodesForPartitionLeaders(partitions);
-
- // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
- assertEquals(2, nodes.size());
- Node node0 = nodes.get(0);
- Node node1 = nodes.get(1);
- List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
- List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
- assertEquals(2, node0Partitions.size());
- assertEquals(2, node1Partitions.size());
- TopicPartition node0Partition1 = node0Partitions.get(0);
- TopicPartition node0Partition2 = node0Partitions.get(1);
- TopicPartition node1Partition1 = node1Partitions.get(0);
- TopicPartition node1Partition2 = node1Partitions.get(1);
-
- // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
- assertEquals(2, sendFetches());
- prepareFetchResponses(node0, node0Partitions, 0);
- prepareFetchResponses(node1, node1Partitions, 0);
- networkClientDelegate.poll(time.timer(0));
-
- // Collect node0Partition1 so that it doesn't have anything in the
fetch buffer.
- assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
- collectSelectedPartition(node0Partition1, partitions);
- assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // Exclude node0Partition2 (the remaining buffered partition for node
0) when updating the assigned partitions
- // to cause it to become unassigned.
- subscriptions.assignFromUser(Set.of(
- node0Partition1,
- // node0Partition2, // Intentionally omit this partition
so that it is unassigned
- node1Partition1,
- node1Partition2
- ));
-
- // node0Partition1 (the collected partition) should have a retrievable
position, but node0Partition2
- // (the unassigned position) should throw an error when attempting to
retrieve its position.
- assertDoesNotThrow(() -> subscriptions.position(node0Partition1));
- assertThrows(IllegalStateException.class, () ->
subscriptions.position(node0Partition2));
-
- // sendFetches() call #2 should issue a request to node 0 because the
first partition in node 0 was collected
- // (and its buffer removed) and the second partition for node 0 was
unassigned. As a result, there are now no
- // *assigned* partitions for node 0 that are buffered.
- assertEquals(1, sendFetches());
- }
-
- @Test
- public void testFetchRequestWithBufferedPartitionMissingLeader() {
- buildFetcher();
-
- Set<TopicPartition> partitions = Set.of(tp0, tp1);
- assignFromUser(partitions);
-
- // Seek each partition so that it becomes eligible to fetch.
- partitions.forEach(tp -> subscriptions.seek(tp, 0));
-
- Node leader = metadata.fetch().leaderFor(tp0);
-
- // sendFetches() call #1 should issue a request since there's no
buffered data.
- assertEquals(1, sendFetches());
- prepareFetchResponses(leader, Set.of(tp0, tp1), 0);
- networkClientDelegate.poll(time.timer(0));
-
- // Per the fetch response, data for both of the partitions are in the
fetch buffer.
- assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp0));
- assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
-
- // Collect the first partition (tp0) which will remove it from the
fetch buffer.
- collectSelectedPartition(tp0, partitions);
-
- // Since tp0 was collected, it's not in the fetch buffer, but tp1
remains in the fetch buffer.
- assertFalse(fetcher.fetchBuffer.bufferedPartitions().contains(tp0));
- assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
-
- // Overwrite tp1's position with an empty leader, but verify that it
is still buffered. Having a leaderless,
- // buffered partition is key to triggering the test case.
- subscriptions.position(tp1, new SubscriptionState.FetchPosition(
- 0,
- Optional.empty(),
- Metadata.LeaderAndEpoch.noLeaderOrEpoch()
- ));
- assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
-
- // Validate the state of the collected partition (tp0) and leaderless
partition (tp1) before sending the
- // fetch request.
-
assertTrue(subscriptions.position(tp0).currentLeader.leader.isPresent());
-
assertFalse(subscriptions.position(tp1).currentLeader.leader.isPresent());
-
- // sendFetches() call #2 should issue a fetch request because it has
no buffered partitions:
- //
- // - tp0 was collected and thus not in the fetch buffer
- // - tp1, while still in the fetch buffer, is leaderless
- //
- // As a result, there are now effectively no buffered partitions for
which there is a leader.
- assertEquals(1, sendFetches());
- }
-
- @Test
- public void testFetchRequestWithBufferedPartitionMissingPosition() {
- buildFetcher();
-
- // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
- // partition-to-node mapping.
- int numNodes = 2;
- Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
- assignFromUser(partitions, numNodes);
-
- // Seek each partition so that it becomes eligible to fetch.
- partitions.forEach(tp -> subscriptions.seek(tp, 0));
-
- // Get all the nodes serving as the leader for these partitions.
- List<Node> nodes = nodesForPartitionLeaders(partitions);
-
- // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
- assertEquals(2, nodes.size());
- Node node0 = nodes.get(0);
- Node node1 = nodes.get(1);
- List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
- List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
- assertEquals(2, node0Partitions.size());
- assertEquals(2, node1Partitions.size());
- TopicPartition node0Partition1 = node0Partitions.get(0);
- TopicPartition node0Partition2 = node0Partitions.get(1);
-
- // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
- assertEquals(2, sendFetches());
- prepareFetchResponses(node0, node0Partitions, 0);
- prepareFetchResponses(node1, node1Partitions, 0);
- networkClientDelegate.poll(time.timer(0));
-
- // Collect node 0's first partition (node0Partition1) which will
remove it from the fetch buffer.
- assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
- collectSelectedPartition(node0Partition1, partitions);
- assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
-
- // Overwrite node0Partition2's position with an empty leader to
trigger the test case.
- subscriptions.position(node0Partition2, null);
-
- // Confirm that calling SubscriptionState.position() succeeds for a
leaderless partition. While it shouldn't
- // throw an exception, it should return a null position.
- SubscriptionState.FetchPosition position = assertDoesNotThrow(() ->
subscriptions.position(node0Partition2));
- assertNull(position);
-
- // sendFetches() call #2 will now fail to send any requests as we have
an invalid position in the assignment.
- // The Consumer.poll() API will throw an IllegalStateException to the
user.
- Future<Void> future = fetcher.createFetchRequests();
- assertEquals(0, sendFetches());
- assertFutureThrows(future, IllegalStateException.class);
- }
-
- /**
- * For each partition given, return the set of nodes that represent the
partition's leader using
- * {@link Cluster#leaderFor(TopicPartition)}.
- */
- private List<Node> nodesForPartitionLeaders(Set<TopicPartition>
partitions) {
- Cluster cluster = metadata.fetch();
-
- return partitions.stream()
- .map(cluster::leaderFor)
- .filter(Objects::nonNull)
- .distinct()
- .collect(Collectors.toList());
- }
-
- /**
- * For the given set of partitions, filter the partitions to be those
where the partition's leader node
- * (via {@link Cluster#leaderFor(TopicPartition)}) matches the given node.
- */
- private List<TopicPartition> partitionsForNode(Node node,
Set<TopicPartition> partitions) {
- Cluster cluster = metadata.fetch();
-
- return partitions.stream()
- .filter(tp -> node.equals(cluster.leaderFor(tp)))
- .collect(Collectors.toList());
- }
-
- /**
- * Creates 10 dummy records starting at the given offset for each given
partition and directs each response to the
- * given node.
- */
- private void prepareFetchResponses(Node node, Collection<TopicPartition>
partitions, int offset) {
- LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
-
- partitions.forEach(tp -> {
- MemoryRecords records = buildRecords(offset, 10, 1);
- FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
- .setPartitionIndex(tp.partition())
- .setHighWatermark(100)
- .setRecords(records);
- partitionDataMap.put(new TopicIdPartition(topicId, tp),
partitionData);
- });
-
- client.prepareResponseFrom(
- FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
partitionDataMap),
- node
- );
- }
-
- /**
- * Invokes {@link #collectFetch()}, but before doing so it {@link
Consumer#pause(Collection) pauses} all the
- * partitions in the given set of partitions <em>except</em> for {@code
partition}. This is done so that only
- * that partition will be collected. Once the collection has been
performed, the previously-paused partitions
- * are then {@link Consumer#resume(Collection) resumed}.
- */
- private void collectSelectedPartition(TopicPartition partition,
Set<TopicPartition> partitions) {
- // Pause any remaining partitions so that when fetchRecords() is
called, only the records for the
- // "fetched" partition are collected, leaving the remaining in the
fetch buffer.
- Set<TopicPartition> pausedPartitions = partitions.stream()
- .filter(tp -> !tp.equals(partition))
- .collect(Collectors.toSet());
-
- // Fetch the records, which should be just for the expected topic
partition since the others were paused.
- pausedPartitions.forEach(tp -> subscriptions.pause(tp));
- fetchRecords();
- pausedPartitions.forEach(tp -> subscriptions.resume(tp));
+ private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
+ TopicPartition topicPartition,
+ Errors error,
+ int leaderEpoch,
+ long endOffset
+ ) {
+ OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
+ data.topics().add(new OffsetForLeaderTopicResult()
+ .setTopic(topicPartition.topic())
+ .setPartitions(Collections.singletonList(new EpochEndOffset()
+ .setPartition(topicPartition.partition())
+ .setErrorCode(error.code())
+ .setLeaderEpoch(leaderEpoch)
+ .setEndOffset(endOffset))));
+ return new OffsetsForLeaderEpochResponse(data);
}
private FetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp,
Errors error, int throttleTime) {
@@ -4072,7 +3749,7 @@ public class FetchRequestManagerTest {
private class TestableNetworkClientDelegate extends NetworkClientDelegate {
- private final Logger log =
LoggerFactory.getLogger(TestableNetworkClientDelegate.class);
+ private final Logger log =
LoggerFactory.getLogger(NetworkClientDelegate.class);
private final ConcurrentLinkedQueue<Node> pendingDisconnects = new
ConcurrentLinkedQueue<>();
public TestableNetworkClientDelegate(Time time,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b24475300c8..856c2b9478a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -209,15 +209,11 @@ public class FetcherTest {
}
private void assignFromUser(Set<TopicPartition> partitions) {
- assignFromUser(partitions, 1);
- }
-
- private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
subscriptions.assignFromUser(partitions);
- client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(numNodes,
singletonMap(topicName, 4), topicIds));
+ client.updateMetadata(initialUpdateResponse);
// A dummy metadata update to ensure valid leader epoch.
-
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
numNodes,
+
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
1,
Collections.emptyMap(), singletonMap(topicName, 4),
tp -> validLeaderEpoch, topicIds), false, 0L);
}
@@ -1156,17 +1152,16 @@ public class FetcherTest {
Set<TopicPartition> tps = new HashSet<>();
tps.add(tp0);
tps.add(tp1);
- assignFromUser(tps, 2); // Use multiple nodes so
partitions have different leaders
+ assignFromUser(tps);
subscriptions.seek(tp0, 1);
subscriptions.seek(tp1, 6);
- client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE,
100L, 0));
- client.prepareResponse(fullFetchResponse(tidp1, moreRecords,
Errors.NONE, 100L, 0));
+ client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1,
moreRecords, 100L));
client.prepareResponse(fullFetchResponse(tidp0, emptyRecords,
Errors.NONE, 100L, 0));
- // Send two fetch requests (one to each node) because we do not have
pending fetch responses to process.
- // The fetch responses will return 3 records for tp0 and 3 more for
tp1.
- assertEquals(2, sendFetches());
+ // Send fetch request because we do not have pending fetch responses
to process.
+ // The first fetch response will return 3 records for tp0 and 3 more
for tp1.
+ assertEquals(1, sendFetches());
// The poll returns 2 records from one of the topic-partitions
(non-deterministic).
// This leaves 1 record pending from that topic-partition, and the
remaining 3 from the other.
pollAndValidateMaxPollRecordsNotExceeded(maxPollRecords);
@@ -1433,7 +1428,7 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords;
- assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
+ assignFromUser(Set.of(tp0, tp1));
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1465,7 +1460,7 @@ public class FetcherTest {
public void testFetchOnCompletedFetchesForAllPausedPartitions() {
buildFetcher();
- assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
+ assignFromUser(Set.of(tp0, tp1));
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1828,9 +1823,7 @@ public class FetcherTest {
buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), 2,
IsolationLevel.READ_UNCOMMITTED);
- // Use multiple nodes so partitions have different leaders. tp0 is
added here, but tp1 is also assigned
- // about halfway down.
- assignFromUser(Set.of(tp0), 2);
+ assignFromUser(Set.of(tp0));
subscriptions.seek(tp0, 1);
assertEquals(1, sendFetches());
Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
@@ -3764,6 +3757,28 @@ public class FetcherTest {
return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID,
new LinkedHashMap<>(partitions));
}
+ private FetchResponse fetchResponse2(TopicIdPartition tp1, MemoryRecords
records1, long hw1,
+ TopicIdPartition tp2, MemoryRecords
records2, long hw2) {
+ Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
+ partitions.put(tp1,
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp1.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(hw1)
+
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(0)
+ .setRecords(records1));
+ partitions.put(tp2,
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp2.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(hw2)
+
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(0)
+ .setRecords(records2));
+ return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new
LinkedHashMap<>(partitions));
+ }
+
/**
* Assert that the {@link Fetcher#collectFetch() latest fetch} does not
contain any
* {@link Fetch#records() user-visible records}, did not