This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 9c02072c4fd KAFKA-17182: Consumer fetch sessions are evicted too 
quickly with AsyncKafkaConsumer (#17700)
9c02072c4fd is described below

commit 9c02072c4fd8159087b7960872d6ee8271a308ab
Author: Kirk True <[email protected]>
AuthorDate: Thu Jan 30 13:12:11 2025 -0800

    KAFKA-17182: Consumer fetch sessions are evicted too quickly with 
AsyncKafkaConsumer (#17700)
    
    This change reduces fetch session cache evictions on the broker for 
AsyncKafkaConsumer by altering its logic to determine which partitions it 
includes in fetch requests.
    
    Background
    Consumer implementations fetch data from the cluster and temporarily buffer 
it in memory until the user next calls Consumer.poll(). When a fetch request is 
being generated, partitions that already have buffered data are not included in 
the fetch request.
    
    The ClassicKafkaConsumer performs much of its fetch logic and network I/O 
in the application thread. On poll(), if there is any locally-buffered data, 
the ClassicKafkaConsumer does not fetch any new data and simply returns the 
buffered data to the user from poll().
    
    On the other hand, the AsyncKafkaConsumer consumer splits its logic and 
network I/O between two threads, which results in a potential race condition 
during fetch. The AsyncKafkaConsumer also checks for buffered data on its 
application thread. If it finds there is none, it signals the background thread 
to create a fetch request. However, it's possible for the background thread to 
receive data from a previous fetch and buffer it before the fetch request logic 
starts. When that occurs, a [...]
    
    This issue is technically possible in the ClassicKafkaConsumer too, since 
the heartbeat thread performs network I/O in addition to the application 
thread. However, because of the frequency at which the AsyncKafkaConsumer's 
background thread runs, it is ~100x more likely to happen.
    
    Options
    The core decision is: what should the background thread do if it is asked 
to create a fetch request and it discovers there's buffered data. There were 
multiple proposals to address this issue in the AsyncKafkaConsumer. Among them 
are:
    
    The background thread should omit buffered partitions from the fetch 
request as before (this is the existing behavior)
    The background thread should skip the fetch request generation entirely if 
there are any buffered partitions
    The background thread should include buffered partitions in the fetch 
request, but use a small “max bytes” value
    The background thread should skip fetching from the nodes that have 
buffered partitions
    Option 4 won out. The change is localized to AbstractFetch where the basic 
idea is to skip fetch requests to a given node if that node is the leader for 
buffered data. By preventing a fetch request from being sent to that node, it 
won't have any "holes" where the buffered partitions should be.
    
    Reviewers: Lianet Magrans <[email protected]>, Jeff Kim 
<[email protected]>, Jun Rao <[email protected]>
---
 .../clients/consumer/internals/AbstractFetch.java  |  98 +++++-
 .../internals/FetchRequestManagerTest.java         | 373 +++++++++++++++++++--
 .../clients/consumer/internals/FetcherTest.java    |  49 +--
 3 files changed, 447 insertions(+), 73 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index e3d4eb58af4..651d43564b3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -44,6 +44,7 @@ import org.slf4j.helpers.MessageFormatter;
 
 import java.io.Closeable;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -315,17 +316,15 @@ public abstract class AbstractFetch implements Closeable {
     }
 
     /**
-     * Return the list of <em>fetchable</em> partitions, which are the set of 
partitions to which we are subscribed,
+     * Return the set of <em>fetchable</em> partitions, which are the set of 
partitions to which we are subscribed,
      * but <em>excluding</em> any partitions for which we still have buffered 
data. The idea is that since the user
      * has yet to process the data for the partition that has already been 
fetched, we should not go send for more data
      * until the previously-fetched data has been processed.
      *
+     * @param buffered The set of partitions we have in our buffer
      * @return {@link Set} of {@link TopicPartition topic partitions} for 
which we should fetch data
      */
-    private Set<TopicPartition> fetchablePartitions() {
-        // This is the set of partitions we have in our buffer
-        Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();
-
+    private Set<TopicPartition> fetchablePartitions(Set<TopicPartition> 
buffered) {
         // This is the test that returns true if the partition is *not* 
buffered
         Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
 
@@ -408,22 +407,44 @@ public abstract class AbstractFetch implements Closeable {
         long currentTimeMs = time.milliseconds();
         Map<String, Uuid> topicIds = metadata.topicIds();
 
-        for (TopicPartition partition : fetchablePartitions()) {
-            SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+        // This is the set of partitions that have buffered data
+        Set<TopicPartition> buffered = 
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
 
-            if (position == null)
-                throw new IllegalStateException("Missing position for 
fetchable partition " + partition);
+        // This is the set of partitions that do not have buffered data
+        Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
 
-            Optional<Node> leaderOpt = position.currentLeader.leader;
+        if (unbuffered.isEmpty()) {
+            // If there are no partitions that don't already have data locally 
buffered, there's no need to issue
+            // any fetch requests at the present time.
+            return Collections.emptyMap();
+        }
 
-            if (leaderOpt.isEmpty()) {
-                log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
-                metadata.requestUpdate(false);
+        Set<Integer> bufferedNodes = new HashSet<>();
+
+        for (TopicPartition partition : buffered) {
+            // It's possible that at the time of the fetcher creating new 
fetch requests, a partition with buffered
+            // data from a *previous* request is no longer assigned. So before 
attempting to retrieve the node
+            // information, check that the partition is still assigned and 
fetchable; an unassigned/invalid partition
+            // will throw an IllegalStateException in positionForPartition.
+            //
+            // Note: this check is not needed for the unbuffered partitions as 
the logic in
+            // SubscriptionState.fetchablePartitions() only includes 
partitions currently assigned.
+            if (!subscriptions.hasValidPosition(partition))
                 continue;
-            }
 
-            // Use the preferred read replica if set, otherwise the 
partition's leader
-            Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
+            SubscriptionState.FetchPosition position = 
positionForPartition(partition);
+            Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, 
currentTimeMs);
+            nodeOpt.ifPresent(node -> bufferedNodes.add(node.id()));
+        }
+
+        for (TopicPartition partition : unbuffered) {
+            SubscriptionState.FetchPosition position = 
positionForPartition(partition);
+            Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, 
currentTimeMs);
+
+            if (nodeOpt.isEmpty())
+                continue;
+
+            Node node = nodeOpt.get();
 
             if (isUnavailable(node)) {
                 maybeThrowAuthFailure(node);
@@ -432,7 +453,14 @@ public abstract class AbstractFetch implements Closeable {
                 // going to be failed anyway before being sent, so skip 
sending the request for now
                 log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
             } else if (nodesWithPendingFetchRequests.contains(node.id())) {
+                // If there's already an inflight request for this node, don't 
issue another request.
                 log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
+            } else if (bufferedNodes.contains(node.id())) {
+                // While a node has buffered data, don't fetch other partition 
data from it. Because the buffered
+                // partitions are not included in the fetch request, those 
partitions will be inadvertently dropped
+                // from the broker fetch session cache. In some cases, that 
could lead to the entire fetch session
+                // being evicted.
+                log.trace("Skipping fetch for partition {} because its leader 
node {} hosts buffered partitions", partition, node);
             } else {
                 // if there is a leader and no in-flight requests, issue a new 
fetch
                 FetchSessionHandler.Builder builder = 
fetchable.computeIfAbsent(node, k -> {
@@ -456,6 +484,44 @@ public abstract class AbstractFetch implements Closeable {
         return 
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().build()));
     }
 
+    /**
+     * Simple utility method that returns a {@link 
SubscriptionState.FetchPosition position} for the partition. If
+     * no position exists, an {@link IllegalStateException} is thrown.
+     */
+    private SubscriptionState.FetchPosition 
positionForPartition(TopicPartition partition) {
+        SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+
+        if (position == null)
+            throw new IllegalStateException("Missing position for fetchable 
partition " + partition);
+
+        return position;
+    }
+
+    /**
+     * Retrieves the node from which to fetch the partition data. If the given
+     * {@link SubscriptionState.FetchPosition position} does not have a current
+     * {@link Metadata.LeaderAndEpoch#leader leader} defined the method will 
return {@link Optional#empty()}.
+     *
+     * @return Three options: 1) {@link Optional#empty()} if the position's 
leader is empty, 2) the
+     * {@link #selectReadReplica(TopicPartition, Node, long) read replica, if 
defined}, or 3) the position's
+     * {@link Metadata.LeaderAndEpoch#leader leader}
+     */
+    private Optional<Node> maybeNodeForPosition(TopicPartition partition,
+                                                
SubscriptionState.FetchPosition position,
+                                                long currentTimeMs) {
+        Optional<Node> leaderOpt = position.currentLeader.leader;
+
+        if (leaderOpt.isEmpty()) {
+            log.debug("Requesting metadata update for partition {} since the 
position {} is missing the current leader node", partition, position);
+            metadata.requestUpdate(false);
+            return Optional.empty();
+        }
+
+        // Use the preferred read replica if set, otherwise the partition's 
leader
+        Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
+        return Optional.of(node);
+    }
+
     // Visible for testing
     protected FetchSessionHandler sessionHandler(int node) {
         return sessionHandlers.get(node);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 6505a167a33..d6f19f4bf58 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
@@ -48,9 +49,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -75,7 +73,6 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchRequest.PartitionData;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.BytesDeserializer;
@@ -109,6 +106,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -116,11 +114,13 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -211,11 +211,15 @@ public class FetchRequestManagerTest {
     }
 
     private void assignFromUser(Set<TopicPartition> partitions) {
+        assignFromUser(partitions, 1);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
         subscriptions.assignFromUser(partitions);
-        client.updateMetadata(initialUpdateResponse);
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(numNodes, 
singletonMap(topicName, 4), topicIds));
 
         // A dummy metadata update to ensure valid leader epoch.
-        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 numNodes,
                 Collections.emptyMap(), singletonMap(topicName, 4),
                 tp -> validLeaderEpoch, topicIds), false, 0L);
     }
@@ -1429,7 +1433,7 @@ public class FetchRequestManagerTest {
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords;
 
-        assignFromUser(Set.of(tp0, tp1));
+        assignFromUser(Set.of(tp0, tp1), 2);    // Use multiple nodes so 
partitions have different leaders
 
         // seek to tp0 and tp1 in two polls to generate 2 complete requests 
and responses
 
@@ -1461,7 +1465,7 @@ public class FetchRequestManagerTest {
     public void testFetchOnCompletedFetchesForAllPausedPartitions() {
         buildFetcher();
 
-        assignFromUser(Set.of(tp0, tp1));
+        assignFromUser(Set.of(tp0, tp1), 2);    // Use multiple nodes so 
partitions have different leaders
 
         // seek to tp0 and tp1 in two polls to generate 2 complete requests 
and responses
 
@@ -1837,7 +1841,9 @@ public class FetchRequestManagerTest {
         buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(), 2, 
IsolationLevel.READ_UNCOMMITTED);
 
-        assignFromUser(Set.of(tp0));
+        // Use multiple nodes so partitions have different leaders. tp0 is 
added here, but tp1 is also assigned
+        // about halfway down.
+        assignFromUser(Set.of(tp0), 2);
         subscriptions.seek(tp0, 1);
         assertEquals(1, sendFetches());
         Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new HashMap<>();
@@ -3437,21 +3443,338 @@ public class FetchRequestManagerTest {
 
     }
 
-    private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
-            TopicPartition topicPartition,
-            Errors error,
-            int leaderEpoch,
-            long endOffset
-    ) {
-        OffsetForLeaderEpochResponseData data = new 
OffsetForLeaderEpochResponseData();
-        data.topics().add(new OffsetForLeaderTopicResult()
-                .setTopic(topicPartition.topic())
-                .setPartitions(Collections.singletonList(new EpochEndOffset()
-                        .setPartition(topicPartition.partition())
-                        .setErrorCode(error.code())
-                        .setLeaderEpoch(leaderEpoch)
-                        .setEndOffset(endOffset))));
-        return new OffsetsForLeaderEpochResponse(data);
+    /**
+     * This test makes several calls to {@link #sendFetches()}, and after 
each, the buffered partitions are
+     * modified to either cause (or prevent) a fetch from being requested.
+     */
+    @Test
+    public void testFetchRequestWithBufferedPartitions() {
+        buildFetcher();
+
+        // The partitions are spread across multiple nodes to ensure the 
fetcher's logic correctly handles the
+        // partition-to-node mapping.
+        int numNodes = 2;
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions, numNodes);
+
+        // Seek each partition so that it becomes eligible to fetch.
+        partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+        // Get all the nodes serving as the leader for these partitions.
+        List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+        // Extract the nodes and their respective set of partitions to make 
things easier to keep track of later.
+        assertEquals(2, nodes.size());
+        Node node0 = nodes.get(0);
+        Node node1 = nodes.get(1);
+        List<TopicPartition> node0Partitions = partitionsForNode(node0, 
partitions);
+        List<TopicPartition> node1Partitions = partitionsForNode(node1, 
partitions);
+        assertEquals(2, node0Partitions.size());
+        assertEquals(2, node1Partitions.size());
+        TopicPartition node0Partition1 = node0Partitions.get(0);
+        TopicPartition node0Partition2 = node0Partitions.get(1);
+        TopicPartition node1Partition1 = node1Partitions.get(0);
+        TopicPartition node1Partition2 = node1Partitions.get(1);
+
+        // sendFetches() call #1 should issue requests to node 0 or node 1 
since neither has buffered data.
+        assertEquals(2, sendFetches());
+        prepareFetchResponses(node0, node0Partitions, 0);
+        prepareFetchResponses(node1, node1Partitions, 0);
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+        collectSelectedPartition(node0Partition1, partitions);
+        node0Partitions.remove(node0Partition1);
+        assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #2 shouldn't issue requests to either node 0 or 
node 1 since they both have buffered data.
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        collectSelectedPartition(node1Partition1, partitions);
+        node1Partitions.remove(node1Partition1);
+        assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #3 shouldn't issue requests to either node 0 or 
node 1 since they both have buffered data.
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        collectSelectedPartition(node0Partition2, partitions);
+        node0Partitions.remove(node0Partition2);
+        assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Validate that all of node 0's partitions have all been collected.
+        assertTrue(node0Partitions.isEmpty());
+
+        // Reset the list of partitions for node 0 so the next fetch pass 
requests data.
+        node0Partitions = partitionsForNode(node0, partitions);
+
+        // sendFetches() call #4 should issue a request to node 0 since its 
buffered data was collected.
+        assertEquals(1, sendFetches());
+        prepareFetchResponses(node0, node0Partitions, 10);
+        networkClientDelegate.poll(time.timer(0));
+
+        collectSelectedPartition(node1Partition2, partitions);
+        node1Partitions.remove(node1Partition2);
+        assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Node 1's partitions have likewise all been collected, so validate 
that.
+        assertTrue(node1Partitions.isEmpty());
+
+        // Again, reset the list of partitions, this time for node 1, so the 
next fetch pass requests data.
+        node1Partitions = partitionsForNode(node1, partitions);
+
+        // sendFetches() call #5 should issue a request to node 1 since its 
buffered data was collected.
+        assertEquals(1, sendFetches());
+        prepareFetchResponses(node1, node1Partitions, 10);
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Collect all the records and make sure they include all the 
partitions, and validate that there is no data
+        // remaining in the fetch buffer.
+        assertEquals(partitions, fetchRecords().keySet());
+        assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #6 should issue a request to nodes 0 and 1 since 
its buffered data was collected.
+        assertEquals(2, sendFetches());
+        prepareFetchResponses(node0, node0Partitions, 20);
+        prepareFetchResponses(node1, node1Partitions, 20);
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Just for completeness, collect all the records and make sure they 
include all the partitions, and validate
+        // that there is no data remaining in the fetch buffer.
+        assertEquals(partitions, fetchRecords().keySet());
+        assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+    }
+
+    @Test
+    public void testFetchRequestWithBufferedPartitionNotAssigned() {
+        buildFetcher();
+
+        // The partitions are spread across multiple nodes to ensure the 
fetcher's logic correctly handles the
+        // partition-to-node mapping.
+        int numNodes = 2;
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions, numNodes);
+
+        // Seek each partition so that it becomes eligible to fetch.
+        partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+        // Get all the nodes serving as the leader for these partitions.
+        List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+        // Extract the nodes and their respective set of partitions to make 
things easier to keep track of later.
+        assertEquals(2, nodes.size());
+        Node node0 = nodes.get(0);
+        Node node1 = nodes.get(1);
+        List<TopicPartition> node0Partitions = partitionsForNode(node0, 
partitions);
+        List<TopicPartition> node1Partitions = partitionsForNode(node1, 
partitions);
+        assertEquals(2, node0Partitions.size());
+        assertEquals(2, node1Partitions.size());
+        TopicPartition node0Partition1 = node0Partitions.get(0);
+        TopicPartition node0Partition2 = node0Partitions.get(1);
+        TopicPartition node1Partition1 = node1Partitions.get(0);
+        TopicPartition node1Partition2 = node1Partitions.get(1);
+
+        // sendFetches() call #1 should issue requests to node 0 or node 1 
since neither has buffered data.
+        assertEquals(2, sendFetches());
+        prepareFetchResponses(node0, node0Partitions, 0);
+        prepareFetchResponses(node1, node1Partitions, 0);
+        networkClientDelegate.poll(time.timer(0));
+
+        // Collect node0Partition1 so that it doesn't have anything in the 
fetch buffer.
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+        collectSelectedPartition(node0Partition1, partitions);
+        assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Exclude node0Partition2 (the remaining buffered partition for node 
0) when updating the assigned partitions
+        // to cause it to become unassigned.
+        subscriptions.assignFromUser(Set.of(
+            node0Partition1,
+            // node0Partition2,         // Intentionally omit this partition 
so that it is unassigned
+            node1Partition1,
+            node1Partition2
+        ));
+
+        // node0Partition1 (the collected partition) should have a retrievable 
position, but node0Partition2
+        // (the unassigned position) should throw an error when attempting to 
retrieve its position.
+        assertDoesNotThrow(() -> subscriptions.position(node0Partition1));
+        assertThrows(IllegalStateException.class, () -> 
subscriptions.position(node0Partition2));
+
+        // sendFetches() call #2 should issue a request to node 0 because the 
first partition in node 0 was collected
+        // (and its buffer removed) and the second partition for node 0 was 
unassigned. As a result, there are now no
+        // *assigned* partitions for node 0 that are buffered.
+        assertEquals(1, sendFetches());
+    }
+
+    @Test
+    public void testFetchRequestWithBufferedPartitionMissingLeader() {
+        buildFetcher();
+
+        Set<TopicPartition> partitions = Set.of(tp0, tp1);
+        assignFromUser(partitions);
+
+        // Seek each partition so that it becomes eligible to fetch.
+        partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+        Node leader = metadata.fetch().leaderFor(tp0);
+
+        // sendFetches() call #1 should issue a request since there's no 
buffered data.
+        assertEquals(1, sendFetches());
+        prepareFetchResponses(leader, Set.of(tp0, tp1), 0);
+        networkClientDelegate.poll(time.timer(0));
+
+        // Per the fetch response, data for both of the partitions are in the 
fetch buffer.
+        assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp0));
+        assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
+
+        // Collect the first partition (tp0) which will remove it from the 
fetch buffer.
+        collectSelectedPartition(tp0, partitions);
+
+        // Since tp0 was collected, it's not in the fetch buffer, but tp1 
remains in the fetch buffer.
+        assertFalse(fetcher.fetchBuffer.bufferedPartitions().contains(tp0));
+        assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
+
+        // Overwrite tp1's position with an empty leader, but verify that it 
is still buffered. Having a leaderless,
+        // buffered partition is key to triggering the test case.
+        subscriptions.position(tp1, new SubscriptionState.FetchPosition(
+            0,
+            Optional.empty(),
+            Metadata.LeaderAndEpoch.noLeaderOrEpoch()
+        ));
+        assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1));
+
+        // Validate the state of the collected partition (tp0) and leaderless 
partition (tp1) before sending the
+        // fetch request.
+        
assertTrue(subscriptions.position(tp0).currentLeader.leader.isPresent());
+        
assertFalse(subscriptions.position(tp1).currentLeader.leader.isPresent());
+
+        // sendFetches() call #2 should issue a fetch request because it has 
no buffered partitions:
+        //
+        // - tp0 was collected and thus not in the fetch buffer
+        // - tp1, while still in the fetch buffer, is leaderless
+        //
+        // As a result, there are now effectively no buffered partitions for 
which there is a leader.
+        assertEquals(1, sendFetches());
+    }
+
+    @Test
+    public void testFetchRequestWithBufferedPartitionMissingPosition() {
+        buildFetcher();
+
+        // The partitions are spread across multiple nodes to ensure the 
fetcher's logic correctly handles the
+        // partition-to-node mapping.
+        int numNodes = 2;
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions, numNodes);
+
+        // Seek each partition so that it becomes eligible to fetch.
+        partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+        // Get all the nodes serving as the leader for these partitions.
+        List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+        // Extract the nodes and their respective set of partitions to make 
things easier to keep track of later.
+        assertEquals(2, nodes.size());
+        Node node0 = nodes.get(0);
+        Node node1 = nodes.get(1);
+        List<TopicPartition> node0Partitions = partitionsForNode(node0, 
partitions);
+        List<TopicPartition> node1Partitions = partitionsForNode(node1, 
partitions);
+        assertEquals(2, node0Partitions.size());
+        assertEquals(2, node1Partitions.size());
+        TopicPartition node0Partition1 = node0Partitions.get(0);
+        TopicPartition node0Partition2 = node0Partitions.get(1);
+
+        // sendFetches() call #1 should issue requests to node 0 or node 1 
since neither has buffered data.
+        assertEquals(2, sendFetches());
+        prepareFetchResponses(node0, node0Partitions, 0);
+        prepareFetchResponses(node1, node1Partitions, 0);
+        networkClientDelegate.poll(time.timer(0));
+
+        // Collect node 0's first partition (node0Partition1) which will 
remove it from the fetch buffer.
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+        collectSelectedPartition(node0Partition1, partitions);
+        assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Overwrite node0Partition2's position with an empty leader to 
trigger the test case.
+        subscriptions.position(node0Partition2, null);
+
+        // Confirm that calling SubscriptionState.position() succeeds for a 
leaderless partition. While it shouldn't
+        // throw an exception, it should return a null position.
+        SubscriptionState.FetchPosition position = assertDoesNotThrow(() -> 
subscriptions.position(node0Partition2));
+        assertNull(position);
+
+        // sendFetches() call #2 will now fail to send any requests as we have 
an invalid position in the assignment.
+        // The Consumer.poll() API will throw an IllegalStateException to the 
user.
+        Future<Void> future = fetcher.createFetchRequests();
+        assertEquals(0, sendFetches());
+        assertFutureThrows(future, IllegalStateException.class);
+    }
+
+    /**
+     * For each partition given, return the set of nodes that represent the 
partition's leader using
+     * {@link Cluster#leaderFor(TopicPartition)}.
+     */
+    private List<Node> nodesForPartitionLeaders(Set<TopicPartition> 
partitions) {
+        Cluster cluster = metadata.fetch();
+
+        return partitions.stream()
+            .map(cluster::leaderFor)
+            .filter(Objects::nonNull)
+            .distinct()
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * For the given set of partitions, filter the partitions to be those 
where the partition's leader node
+     * (via {@link Cluster#leaderFor(TopicPartition)}) matches the given node.
+     */
+    private List<TopicPartition> partitionsForNode(Node node, 
Set<TopicPartition> partitions) {
+        Cluster cluster = metadata.fetch();
+
+        return partitions.stream()
+            .filter(tp -> node.equals(cluster.leaderFor(tp)))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * Creates 10 dummy records starting at the given offset for each given 
partition and directs each response to the
+     * given node.
+     */
+    private void prepareFetchResponses(Node node, Collection<TopicPartition> 
partitions, int offset) {
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitionDataMap = new LinkedHashMap<>();
+
+        partitions.forEach(tp -> {
+            MemoryRecords records = buildRecords(offset, 10, 1);
+            FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setPartitionIndex(tp.partition())
+                .setHighWatermark(100)
+                .setRecords(records);
+            partitionDataMap.put(new TopicIdPartition(topicId, tp), 
partitionData);
+        });
+
+        client.prepareResponseFrom(
+            FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
partitionDataMap),
+            node
+        );
+    }
+
+    /**
+     * Invokes {@link #collectFetch()}, but before doing so it {@link 
Consumer#pause(Collection) pauses} all the
+     * partitions in the given set of partitions <em>except</em> for {@code 
partition}. This is done so that only
+     * that partition will be collected. Once the collection has been 
performed, the previously-paused partitions
+     * are then {@link Consumer#resume(Collection) resumed}.
+     */
+    private void collectSelectedPartition(TopicPartition partition, 
Set<TopicPartition> partitions) {
+        // Pause any remaining partitions so that when fetchRecords() is 
called, only the records for the
+        // "fetched" partition are collected, leaving the remaining in the 
fetch buffer.
+        Set<TopicPartition> pausedPartitions = partitions.stream()
+            .filter(tp -> !tp.equals(partition))
+            .collect(Collectors.toSet());
+
+        // Fetch the records, which should be just for the expected topic 
partition since the others were paused.
+        pausedPartitions.forEach(tp -> subscriptions.pause(tp));
+        fetchRecords();
+        pausedPartitions.forEach(tp -> subscriptions.resume(tp));
     }
 
     private FetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, 
Errors error, int throttleTime) {
@@ -3749,7 +4072,7 @@ public class FetchRequestManagerTest {
 
     private class TestableNetworkClientDelegate extends NetworkClientDelegate {
 
-        private final Logger log = 
LoggerFactory.getLogger(NetworkClientDelegate.class);
+        private final Logger log = 
LoggerFactory.getLogger(TestableNetworkClientDelegate.class);
         private final ConcurrentLinkedQueue<Node> pendingDisconnects = new 
ConcurrentLinkedQueue<>();
 
         public TestableNetworkClientDelegate(Time time,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 729668f8076..bc82aeae9fa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -209,11 +209,15 @@ public class FetcherTest {
     }
 
     private void assignFromUser(Set<TopicPartition> partitions) {
+        assignFromUser(partitions, 1);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
         subscriptions.assignFromUser(partitions);
-        client.updateMetadata(initialUpdateResponse);
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(numNodes, 
singletonMap(topicName, 4), topicIds));
 
         // A dummy metadata update to ensure valid leader epoch.
-        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 numNodes,
             Collections.emptyMap(), singletonMap(topicName, 4),
             tp -> validLeaderEpoch, topicIds), false, 0L);
     }
@@ -1152,16 +1156,17 @@ public class FetcherTest {
         Set<TopicPartition> tps = new HashSet<>();
         tps.add(tp0);
         tps.add(tp1);
-        assignFromUser(tps);
+        assignFromUser(tps, 2);                 // Use multiple nodes so 
partitions have different leaders
         subscriptions.seek(tp0, 1);
         subscriptions.seek(tp1, 6);
 
-        client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        client.prepareResponse(fullFetchResponse(tidp1, moreRecords, 
Errors.NONE, 100L, 0));
         client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, 
Errors.NONE, 100L, 0));
 
-        // Send fetch request because we do not have pending fetch responses 
to process.
-        // The first fetch response will return 3 records for tp0 and 3 more 
for tp1.
-        assertEquals(1, sendFetches());
+        // Send two fetch requests (one to each node) because we do not have 
pending fetch responses to process.
+        // The fetch responses will return 3 records for tp0 and 3 more for 
tp1.
+        assertEquals(2, sendFetches());
         // The poll returns 2 records from one of the topic-partitions 
(non-deterministic).
         // This leaves 1 record pending from that topic-partition, and the 
remaining 3 from the other.
         pollAndValidateMaxPollRecordsNotExceeded(maxPollRecords);
@@ -1428,7 +1433,7 @@ public class FetcherTest {
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords;
 
-        assignFromUser(Set.of(tp0, tp1));
+        assignFromUser(Set.of(tp0, tp1), 2);    // Use multiple nodes so 
partitions have different leaders
 
         // seek to tp0 and tp1 in two polls to generate 2 complete requests 
and responses
 
@@ -1460,7 +1465,7 @@ public class FetcherTest {
     public void testFetchOnCompletedFetchesForAllPausedPartitions() {
         buildFetcher();
 
-        assignFromUser(Set.of(tp0, tp1));
+        assignFromUser(Set.of(tp0, tp1), 2);    // Use multiple nodes so 
partitions have different leaders
 
         // seek to tp0 and tp1 in two polls to generate 2 complete requests 
and responses
 
@@ -1823,7 +1828,9 @@ public class FetcherTest {
         buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(), 2, 
IsolationLevel.READ_UNCOMMITTED);
 
-        assignFromUser(Set.of(tp0));
+        // Use multiple nodes so partitions have different leaders. tp0 is 
added here, but tp1 is also assigned
+        // about halfway down.
+        assignFromUser(Set.of(tp0), 2);
         subscriptions.seek(tp0, 1);
         assertEquals(1, sendFetches());
         Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new HashMap<>();
@@ -3757,28 +3764,6 @@ public class FetcherTest {
         return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
     }
 
-    private FetchResponse fetchResponse2(TopicIdPartition tp1, MemoryRecords 
records1, long hw1,
-                                         TopicIdPartition tp2, MemoryRecords 
records2, long hw2) {
-        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = 
new HashMap<>();
-        partitions.put(tp1,
-                new FetchResponseData.PartitionData()
-                        .setPartitionIndex(tp1.topicPartition().partition())
-                        .setErrorCode(Errors.NONE.code())
-                        .setHighWatermark(hw1)
-                        
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
-                        .setLogStartOffset(0)
-                        .setRecords(records1));
-        partitions.put(tp2,
-                new FetchResponseData.PartitionData()
-                        .setPartitionIndex(tp2.topicPartition().partition())
-                        .setErrorCode(Errors.NONE.code())
-                        .setHighWatermark(hw2)
-                        
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
-                        .setLogStartOffset(0)
-                        .setRecords(records2));
-        return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new 
LinkedHashMap<>(partitions));
-    }
-
     /**
      * Assert that the {@link Fetcher#collectFetch() latest fetch} does not 
contain any
      * {@link Fetch#records() user-visible records}, did not


Reply via email to