This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 9c02072c4fd KAFKA-17182: Consumer fetch sessions are evicted too
quickly with AsyncKafkaConsumer (#17700)
9c02072c4fd is described below
commit 9c02072c4fd8159087b7960872d6ee8271a308ab
Author: Kirk True <[email protected]>
AuthorDate: Thu Jan 30 13:12:11 2025 -0800
KAFKA-17182: Consumer fetch sessions are evicted too quickly with
AsyncKafkaConsumer (#17700)
This change reduces fetch session cache evictions on the broker for
AsyncKafkaConsumer by altering its logic to determine which partitions it
includes in fetch requests.
Background
Consumer implementations fetch data from the cluster and temporarily buffer
it in memory until the user next calls Consumer.poll(). When a fetch request is
being generated, partitions that already have buffered data are not included in
the fetch request.
The ClassicKafkaConsumer performs much of its fetch logic and network I/O
in the application thread. On poll(), if there is any locally-buffered data,
the ClassicKafkaConsumer does not fetch any new data and simply returns the
buffered data to the user from poll().
On the other hand, the AsyncKafkaConsumer consumer splits its logic and
network I/O between two threads, which results in a potential race condition
during fetch. The AsyncKafkaConsumer also checks for buffered data on its
application thread. If it finds there is none, it signals the background thread
to create a fetch request. However, it's possible for the background thread to
receive data from a previous fetch and buffer it before the fetch request logic
starts. When that occurs, a [...]
This issue is technically possible in the ClassicKafkaConsumer too, since
the heartbeat thread performs network I/O in addition to the application
thread. However, because of the frequency at which the AsyncKafkaConsumer's
background thread runs, it is ~100x more likely to happen.
Options
The core decision is: what should the background thread do if it is asked
to create a fetch request and it discovers there's buffered data. There were
multiple proposals to address this issue in the AsyncKafkaConsumer. Among them
are:
The background thread should omit buffered partitions from the fetch
request as before (this is the existing behavior)
The background thread should skip the fetch request generation entirely if
there are any buffered partitions
The background thread should include buffered partitions in the fetch
request, but use a small “max bytes” value
The background thread should skip fetching from the nodes that have
buffered partitions
Option 4 won out. The change is localized to AbstractFetch where the basic
idea is to skip fetch requests to a given node if that node is the leader for
buffered data. By preventing a fetch request from being sent to that node, it
won't have any "holes" where the buffered partitions should be.
Reviewers: Lianet Magrans <[email protected]>, Jeff Kim
<[email protected]>, Jun Rao <[email protected]>
---
.../clients/consumer/internals/AbstractFetch.java | 98 +++++-
.../internals/FetchRequestManagerTest.java | 373 +++++++++++++++++++--
.../clients/consumer/internals/FetcherTest.java | 49 +--
3 files changed, 447 insertions(+), 73 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index e3d4eb58af4..651d43564b3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -44,6 +44,7 @@ import org.slf4j.helpers.MessageFormatter;
import java.io.Closeable;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -315,17 +316,15 @@ public abstract class AbstractFetch implements Closeable {
}
/**
- * Return the list of <em>fetchable</em> partitions, which are the set of
partitions to which we are subscribed,
+ * Return the set of <em>fetchable</em> partitions, which are the set of
partitions to which we are subscribed,
* but <em>excluding</em> any partitions for which we still have buffered
data. The idea is that since the user
* has yet to process the data for the partition that has already been
fetched, we should not go send for more data
* until the previously-fetched data has been processed.
*
+ * @param buffered The set of partitions we have in our buffer
* @return {@link Set} of {@link TopicPartition topic partitions} for
which we should fetch data
*/
- private Set<TopicPartition> fetchablePartitions() {
- // This is the set of partitions we have in our buffer
- Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();
-
+ private Set<TopicPartition> fetchablePartitions(Set<TopicPartition>
buffered) {
// This is the test that returns true if the partition is *not*
buffered
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
@@ -408,22 +407,44 @@ public abstract class AbstractFetch implements Closeable {
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
- for (TopicPartition partition : fetchablePartitions()) {
- SubscriptionState.FetchPosition position =
subscriptions.position(partition);
+ // This is the set of partitions that have buffered data
+ Set<TopicPartition> buffered =
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
- if (position == null)
- throw new IllegalStateException("Missing position for
fetchable partition " + partition);
+ // This is the set of partitions that do not have buffered data
+ Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
- Optional<Node> leaderOpt = position.currentLeader.leader;
+ if (unbuffered.isEmpty()) {
+ // If there are no partitions that don't already have data locally
buffered, there's no need to issue
+ // any fetch requests at the present time.
+ return Collections.emptyMap();
+ }
- if (leaderOpt.isEmpty()) {
- log.debug("Requesting metadata update for partition {} since
the position {} is missing the current leader node", partition, position);
- metadata.requestUpdate(false);
+ Set<Integer> bufferedNodes = new HashSet<>();
+
+ for (TopicPartition partition : buffered) {
+ // It's possible that at the time of the fetcher creating new
fetch requests, a partition with buffered
+ // data from a *previous* request is no longer assigned. So before
attempting to retrieve the node
+ // information, check that the partition is still assigned and
fetchable; an unassigned/invalid partition
+ // will throw an IllegalStateException in positionForPartition.
+ //
+ // Note: this check is not needed for the unbuffered partitions as
the logic in
+ // SubscriptionState.fetchablePartitions() only includes
partitions currently assigned.
+ if (!subscriptions.hasValidPosition(partition))
continue;
- }
- // Use the preferred read replica if set, otherwise the
partition's leader
- Node node = selectReadReplica(partition, leaderOpt.get(),
currentTimeMs);
+ SubscriptionState.FetchPosition position =
positionForPartition(partition);
+ Optional<Node> nodeOpt = maybeNodeForPosition(partition, position,
currentTimeMs);
+ nodeOpt.ifPresent(node -> bufferedNodes.add(node.id()));
+ }
+
+ for (TopicPartition partition : unbuffered) {
+ SubscriptionState.FetchPosition position =
positionForPartition(partition);
+ Optional<Node> nodeOpt = maybeNodeForPosition(partition, position,
currentTimeMs);
+
+ if (nodeOpt.isEmpty())
+ continue;
+
+ Node node = nodeOpt.get();
if (isUnavailable(node)) {
maybeThrowAuthFailure(node);
@@ -432,7 +453,14 @@ public abstract class AbstractFetch implements Closeable {
// going to be failed anyway before being sent, so skip
sending the request for now
log.trace("Skipping fetch for partition {} because node {} is
awaiting reconnect backoff", partition, node);
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
+ // If there's already an inflight request for this node, don't
issue another request.
log.trace("Skipping fetch for partition {} because previous
request to {} has not been processed", partition, node);
+ } else if (bufferedNodes.contains(node.id())) {
+ // While a node has buffered data, don't fetch other partition
data from it. Because the buffered
+ // partitions are not included in the fetch request, those
partitions will be inadvertently dropped
+ // from the broker fetch session cache. In some cases, that
could lead to the entire fetch session
+ // being evicted.
+ log.trace("Skipping fetch for partition {} because its leader
node {} hosts buffered partitions", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new
fetch
FetchSessionHandler.Builder builder =
fetchable.computeIfAbsent(node, k -> {
@@ -456,6 +484,44 @@ public abstract class AbstractFetch implements Closeable {
return
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().build()));
}
+ /**
+ * Simple utility method that returns a {@link
SubscriptionState.FetchPosition position} for the partition. If
+ * no position exists, an {@link IllegalStateException} is thrown.
+ */
+ private SubscriptionState.FetchPosition
positionForPartition(TopicPartition partition) {
+ SubscriptionState.FetchPosition position =
subscriptions.position(partition);
+
+ if (position == null)
+ throw new IllegalStateException("Missing position for fetchable
partition " + partition);
+
+ return position;
+ }
+
+ /**
+ * Retrieves the node from which to fetch the partition data. If the given
+ * {@link SubscriptionState.FetchPosition position} does not have a current
+ * {@link Metadata.LeaderAndEpoch#leader leader} defined the method will
return {@link Optional#empty()}.
+ *
+ * @return Three options: 1) {@link Optional#empty()} if the position's
leader is empty, 2) the
+ * {@link #selectReadReplica(TopicPartition, Node, long) read replica, if
defined}, or 3) the position's
+ * {@link Metadata.LeaderAndEpoch#leader leader}
+ */
+ private Optional<Node> maybeNodeForPosition(TopicPartition partition,
+
SubscriptionState.FetchPosition position,
+ long currentTimeMs) {
+ Optional<Node> leaderOpt = position.currentLeader.leader;
+
+ if (leaderOpt.isEmpty()) {
+ log.debug("Requesting metadata update for partition {} since the
position {} is missing the current leader node", partition, position);
+ metadata.requestUpdate(false);
+ return Optional.empty();
+ }
+
+ // Use the preferred read replica if set, otherwise the partition's
leader
+ Node node = selectReadReplica(partition, leaderOpt.get(),
currentTimeMs);
+ return Optional.of(node);
+ }
+
// Visible for testing
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 6505a167a33..d6f19f4bf58 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
@@ -48,9 +49,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -75,7 +73,6 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchRequest.PartitionData;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
@@ -109,6 +106,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -116,11 +114,13 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -211,11 +211,15 @@ public class FetchRequestManagerTest {
}
private void assignFromUser(Set<TopicPartition> partitions) {
+ assignFromUser(partitions, 1);
+ }
+
+ private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
subscriptions.assignFromUser(partitions);
- client.updateMetadata(initialUpdateResponse);
+ client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(numNodes,
singletonMap(topicName, 4), topicIds));
// A dummy metadata update to ensure valid leader epoch.
-
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
1,
+
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
numNodes,
Collections.emptyMap(), singletonMap(topicName, 4),
tp -> validLeaderEpoch, topicIds), false, 0L);
}
@@ -1429,7 +1433,7 @@ public class FetchRequestManagerTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords;
- assignFromUser(Set.of(tp0, tp1));
+ assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1461,7 +1465,7 @@ public class FetchRequestManagerTest {
public void testFetchOnCompletedFetchesForAllPausedPartitions() {
buildFetcher();
- assignFromUser(Set.of(tp0, tp1));
+ assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1837,7 +1841,9 @@ public class FetchRequestManagerTest {
buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), 2,
IsolationLevel.READ_UNCOMMITTED);
- assignFromUser(Set.of(tp0));
+ // Use multiple nodes so partitions have different leaders. tp0 is
added here, but tp1 is also assigned
+ // about halfway down.
+ assignFromUser(Set.of(tp0), 2);
subscriptions.seek(tp0, 1);
assertEquals(1, sendFetches());
Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
@@ -3437,21 +3443,338 @@ public class FetchRequestManagerTest {
}
- private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
- TopicPartition topicPartition,
- Errors error,
- int leaderEpoch,
- long endOffset
- ) {
- OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
- data.topics().add(new OffsetForLeaderTopicResult()
- .setTopic(topicPartition.topic())
- .setPartitions(Collections.singletonList(new EpochEndOffset()
- .setPartition(topicPartition.partition())
- .setErrorCode(error.code())
- .setLeaderEpoch(leaderEpoch)
- .setEndOffset(endOffset))));
- return new OffsetsForLeaderEpochResponse(data);
+ /**
+ * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
+ * modified to either cause (or prevent) a fetch from being requested.
+ */
+ @Test
+ public void testFetchRequestWithBufferedPartitions() {
+ buildFetcher();
+
+ // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
+ // partition-to-node mapping.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // Get all the nodes serving as the leader for these partitions.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+ // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
+ assertEquals(2, nodes.size());
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+ assertEquals(2, node0Partitions.size());
+ assertEquals(2, node1Partitions.size());
+ TopicPartition node0Partition1 = node0Partitions.get(0);
+ TopicPartition node0Partition2 = node0Partitions.get(1);
+ TopicPartition node1Partition1 = node1Partitions.get(0);
+ TopicPartition node1Partition2 = node1Partitions.get(1);
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ node0Partitions.remove(node0Partition1);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partition1, partitions);
+ node1Partitions.remove(node1Partition1);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node0Partition2, partitions);
+ node0Partitions.remove(node0Partition2);
+ assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Validate that all of node 0's partitions have all been collected.
+ assertTrue(node0Partitions.isEmpty());
+
+ // Reset the list of partitions for node 0 so the next fetch pass
requests data.
+ node0Partitions = partitionsForNode(node0, partitions);
+
+ // sendFetches() call #4 should issue a request to node 0 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+
+ collectSelectedPartition(node1Partition2, partitions);
+ node1Partitions.remove(node1Partition2);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 1's partitions have likewise all been collected, so validate
that.
+ assertTrue(node1Partitions.isEmpty());
+
+ // Again, reset the list of partitions, this time for node 1, so the
next fetch pass requests data.
+ node1Partitions = partitionsForNode(node1, partitions);
+
+ // sendFetches() call #5 should issue a request to node 1 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node1, node1Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Collect all the records and make sure they include all the
partitions, and validate that there is no data
+ // remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #6 should issue a request to nodes 0 and 1 since
its buffered data was collected.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 20);
+ prepareFetchResponses(node1, node1Partitions, 20);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Just for completeness, collect all the records and make sure they
include all the partitions, and validate
+ // that there is no data remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionNotAssigned() {
+ buildFetcher();
+
+ // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
+ // partition-to-node mapping.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // Get all the nodes serving as the leader for these partitions.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+ // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
+ assertEquals(2, nodes.size());
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+ assertEquals(2, node0Partitions.size());
+ assertEquals(2, node1Partitions.size());
+ TopicPartition node0Partition1 = node0Partitions.get(0);
+ TopicPartition node0Partition2 = node0Partitions.get(1);
+ TopicPartition node1Partition1 = node1Partitions.get(0);
+ TopicPartition node1Partition2 = node1Partitions.get(1);
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Collect node0Partition1 so that it doesn't have anything in the
fetch buffer.
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Exclude node0Partition2 (the remaining buffered partition for node
0) when updating the assigned partitions
+ // to cause it to become unassigned.
+ subscriptions.assignFromUser(Set.of(
+ node0Partition1,
+ // node0Partition2, // Intentionally omit this partition
so that it is unassigned
+ node1Partition1,
+ node1Partition2
+ ));
+
+ // node0Partition1 (the collected partition) should have a retrievable
position, but node0Partition2
+ // (the unassigned position) should throw an error when attempting to
retrieve its position.
+ assertDoesNotThrow(() -> subscriptions.position(node0Partition1));
+ assertThrows(IllegalStateException.class, () ->
subscriptions.position(node0Partition2));
+
+ // sendFetches() call #2 should issue a request to node 0 because the
first partition in node 0 was collected
+ // (and its buffer removed) and the second partition for node 0 was
unassigned. As a result, there are now no
+ // *assigned* partitions for node 0 that are buffered.
+ assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionMissingLeader() {
+ buildFetcher();
+
+ Set<TopicPartition> partitions = Set.of(tp0, tp1);
+ assignFromUser(partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ Node leader = metadata.fetch().leaderFor(tp0);
+
+ // sendFetches() call #1 should issue a request since there's no
buffered data.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(leader, Set.of(tp0, tp1), 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Per the fetch response, data for both of the partitions are in the
fetch buffer.
+ assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp0));
+ assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
+
+ // Collect the first partition (tp0) which will remove it from the
fetch buffer.
+ collectSelectedPartition(tp0, partitions);
+
+ // Since tp0 was collected, it's not in the fetch buffer, but tp1
remains in the fetch buffer.
+ assertFalse(fetcher.fetchBuffer.bufferedPartitions().contains(tp0));
+ assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
+
+ // Overwrite tp1's position with an empty leader, but verify that it
is still buffered. Having a leaderless,
+ // buffered partition is key to triggering the test case.
+ subscriptions.position(tp1, new SubscriptionState.FetchPosition(
+ 0,
+ Optional.empty(),
+ Metadata.LeaderAndEpoch.noLeaderOrEpoch()
+ ));
+ assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
+
+ // Validate the state of the collected partition (tp0) and leaderless
partition (tp1) before sending the
+ // fetch request.
+
assertTrue(subscriptions.position(tp0).currentLeader.leader.isPresent());
+
assertFalse(subscriptions.position(tp1).currentLeader.leader.isPresent());
+
+ // sendFetches() call #2 should issue a fetch request because it has
no buffered partitions:
+ //
+ // - tp0 was collected and thus not in the fetch buffer
+ // - tp1, while still in the fetch buffer, is leaderless
+ //
+ // As a result, there are now effectively no buffered partitions for
which there is a leader.
+ assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionMissingPosition() {
+ buildFetcher();
+
+ // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
+ // partition-to-node mapping.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // Get all the nodes serving as the leader for these partitions.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+ // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
+ assertEquals(2, nodes.size());
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+ assertEquals(2, node0Partitions.size());
+ assertEquals(2, node1Partitions.size());
+ TopicPartition node0Partition1 = node0Partitions.get(0);
+ TopicPartition node0Partition2 = node0Partitions.get(1);
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Collect node 0's first partition (node0Partition1) which will
remove it from the fetch buffer.
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Overwrite node0Partition2's position with an empty leader to
trigger the test case.
+ subscriptions.position(node0Partition2, null);
+
+ // Confirm that calling SubscriptionState.position() succeeds for a
leaderless partition. While it shouldn't
+ // throw an exception, it should return a null position.
+ SubscriptionState.FetchPosition position = assertDoesNotThrow(() ->
subscriptions.position(node0Partition2));
+ assertNull(position);
+
+ // sendFetches() call #2 will now fail to send any requests as we have
an invalid position in the assignment.
+ // The Consumer.poll() API will throw an IllegalStateException to the
user.
+ Future<Void> future = fetcher.createFetchRequests();
+ assertEquals(0, sendFetches());
+ assertFutureThrows(future, IllegalStateException.class);
+ }
+
+ /**
+ * For each partition given, return the set of nodes that represent the
partition's leader using
+ * {@link Cluster#leaderFor(TopicPartition)}.
+ */
+ private List<Node> nodesForPartitionLeaders(Set<TopicPartition>
partitions) {
+ Cluster cluster = metadata.fetch();
+
+ return partitions.stream()
+ .map(cluster::leaderFor)
+ .filter(Objects::nonNull)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * For the given set of partitions, filter the partitions to be those
where the partition's leader node
+ * (via {@link Cluster#leaderFor(TopicPartition)}) matches the given node.
+ */
+ private List<TopicPartition> partitionsForNode(Node node,
Set<TopicPartition> partitions) {
+ Cluster cluster = metadata.fetch();
+
+ return partitions.stream()
+ .filter(tp -> node.equals(cluster.leaderFor(tp)))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Creates 10 dummy records starting at the given offset for each given
partition and directs each response to the
+ * given node.
+ */
+ private void prepareFetchResponses(Node node, Collection<TopicPartition>
partitions, int offset) {
+ LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
+
+ partitions.forEach(tp -> {
+ MemoryRecords records = buildRecords(offset, 10, 1);
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition())
+ .setHighWatermark(100)
+ .setRecords(records);
+ partitionDataMap.put(new TopicIdPartition(topicId, tp),
partitionData);
+ });
+
+ client.prepareResponseFrom(
+ FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
partitionDataMap),
+ node
+ );
+ }
+
+ /**
+ * Invokes {@link #collectFetch()}, but before doing so it {@link
Consumer#pause(Collection) pauses} all the
+ * partitions in the given set of partitions <em>except</em> for {@code
partition}. This is done so that only
+ * that partition will be collected. Once the collection has been
performed, the previously-paused partitions
+ * are then {@link Consumer#resume(Collection) resumed}.
+ */
+ private void collectSelectedPartition(TopicPartition partition,
Set<TopicPartition> partitions) {
+ // Pause any remaining partitions so that when fetchRecords() is
called, only the records for the
+ // "fetched" partition are collected, leaving the remaining in the
fetch buffer.
+ Set<TopicPartition> pausedPartitions = partitions.stream()
+ .filter(tp -> !tp.equals(partition))
+ .collect(Collectors.toSet());
+
+ // Fetch the records, which should be just for the expected topic
partition since the others were paused.
+ pausedPartitions.forEach(tp -> subscriptions.pause(tp));
+ fetchRecords();
+ pausedPartitions.forEach(tp -> subscriptions.resume(tp));
}
private FetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp,
Errors error, int throttleTime) {
@@ -3749,7 +4072,7 @@ public class FetchRequestManagerTest {
private class TestableNetworkClientDelegate extends NetworkClientDelegate {
- private final Logger log =
LoggerFactory.getLogger(NetworkClientDelegate.class);
+ private final Logger log =
LoggerFactory.getLogger(TestableNetworkClientDelegate.class);
private final ConcurrentLinkedQueue<Node> pendingDisconnects = new
ConcurrentLinkedQueue<>();
public TestableNetworkClientDelegate(Time time,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 729668f8076..bc82aeae9fa 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -209,11 +209,15 @@ public class FetcherTest {
}
private void assignFromUser(Set<TopicPartition> partitions) {
+ assignFromUser(partitions, 1);
+ }
+
+ private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
subscriptions.assignFromUser(partitions);
- client.updateMetadata(initialUpdateResponse);
+ client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(numNodes,
singletonMap(topicName, 4), topicIds));
// A dummy metadata update to ensure valid leader epoch.
-
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
1,
+
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
numNodes,
Collections.emptyMap(), singletonMap(topicName, 4),
tp -> validLeaderEpoch, topicIds), false, 0L);
}
@@ -1152,16 +1156,17 @@ public class FetcherTest {
Set<TopicPartition> tps = new HashSet<>();
tps.add(tp0);
tps.add(tp1);
- assignFromUser(tps);
+ assignFromUser(tps, 2); // Use multiple nodes so
partitions have different leaders
subscriptions.seek(tp0, 1);
subscriptions.seek(tp1, 6);
- client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1,
moreRecords, 100L));
+ client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE,
100L, 0));
+ client.prepareResponse(fullFetchResponse(tidp1, moreRecords,
Errors.NONE, 100L, 0));
client.prepareResponse(fullFetchResponse(tidp0, emptyRecords,
Errors.NONE, 100L, 0));
- // Send fetch request because we do not have pending fetch responses
to process.
- // The first fetch response will return 3 records for tp0 and 3 more
for tp1.
- assertEquals(1, sendFetches());
+ // Send two fetch requests (one to each node) because we do not have
pending fetch responses to process.
+ // The fetch responses will return 3 records for tp0 and 3 more for
tp1.
+ assertEquals(2, sendFetches());
// The poll returns 2 records from one of the topic-partitions
(non-deterministic).
// This leaves 1 record pending from that topic-partition, and the
remaining 3 from the other.
pollAndValidateMaxPollRecordsNotExceeded(maxPollRecords);
@@ -1428,7 +1433,7 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords;
- assignFromUser(Set.of(tp0, tp1));
+ assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1460,7 +1465,7 @@ public class FetcherTest {
public void testFetchOnCompletedFetchesForAllPausedPartitions() {
buildFetcher();
- assignFromUser(Set.of(tp0, tp1));
+ assignFromUser(Set.of(tp0, tp1), 2); // Use multiple nodes so
partitions have different leaders
// seek to tp0 and tp1 in two polls to generate 2 complete requests
and responses
@@ -1823,7 +1828,9 @@ public class FetcherTest {
buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), 2,
IsolationLevel.READ_UNCOMMITTED);
- assignFromUser(Set.of(tp0));
+ // Use multiple nodes so partitions have different leaders. tp0 is
added here, but tp1 is also assigned
+ // about halfway down.
+ assignFromUser(Set.of(tp0), 2);
subscriptions.seek(tp0, 1);
assertEquals(1, sendFetches());
Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
@@ -3757,28 +3764,6 @@ public class FetcherTest {
return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID,
new LinkedHashMap<>(partitions));
}
- private FetchResponse fetchResponse2(TopicIdPartition tp1, MemoryRecords
records1, long hw1,
- TopicIdPartition tp2, MemoryRecords
records2, long hw2) {
- Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
- partitions.put(tp1,
- new FetchResponseData.PartitionData()
- .setPartitionIndex(tp1.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setHighWatermark(hw1)
-
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- .setLogStartOffset(0)
- .setRecords(records1));
- partitions.put(tp2,
- new FetchResponseData.PartitionData()
- .setPartitionIndex(tp2.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setHighWatermark(hw2)
-
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- .setLogStartOffset(0)
- .setRecords(records2));
- return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new
LinkedHashMap<>(partitions));
- }
-
/**
* Assert that the {@link Fetcher#collectFetch() latest fetch} does not
contain any
* {@link Fetch#records() user-visible records}, did not