This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce996b34e9e KAFKA-19356: Prevent new consumer fetch assigned 
partitions not in explicit subscription  (#19983)
ce996b34e9e is described below

commit ce996b34e9eb90c4541a8cb096b0509e8d39c1bd
Author: Lianet Magrans <98415067+lian...@users.noreply.github.com>
AuthorDate: Wed Jun 18 18:31:46 2025 -0400

    KAFKA-19356: Prevent new consumer fetch assigned partitions not in explicit 
subscription  (#19983)
    
    Fix to ensure assigned partitions whose topics are not in the consumer
    explicit subscription are considered not fetchable (so that no records
    are returned on poll for them)
    
    This scenario could happen in the new async consumer (using the Consumer
    rebalance protocol) when the subscription changes, because the consumer
    will keep its assignment until the coordinator sends a new one (broker
    drives assignments).
    
    This does not happen in the classic consumer because the assignment
    logic lives on the client-side, so the consumer pro-actively updates
    assignment as needed.
    
    This PR validates assignment vs subscription on fetch for explicit
    subscription  only. Regular expressions, shared subscription  remain
    unchanged (regex case still under discussion, will be handled separately
    if needed)
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, TengYao Chi
     <frankvi...@apache.org>, Kirk True <kt...@confluent.io>, Jhen-Yung Hsu
     <jhenyung...@gmail.com>
---
 .../clients/consumer/internals/FetchCollector.java |  5 +-
 .../consumer/internals/SubscriptionState.java      | 19 ++++-
 .../consumer/internals/SubscriptionStateTest.java  | 80 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
index 2aa8aeaaffb..bbe216c2fc8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
@@ -155,7 +155,10 @@ public class FetchCollector<K, V> {
             log.debug("Not returning fetched records for partition {} since it 
is no longer assigned", tp);
         } else if (!subscriptions.isFetchable(tp)) {
             // this can happen when a partition is paused before fetched 
records are returned to the consumer's
-            // poll call or if the offset is being reset
+            // poll call or if the offset is being reset.
+            // It can also happen under the Consumer rebalance protocol, when 
the consumer changes its subscription.
+            // Until the consumer receives an updated assignment from the 
coordinator, it can hold assigned partitions
+            // that are not in the subscription anymore, so we make them not 
fetchable.
             log.debug("Not returning fetched records for assigned partition {} 
since it is no longer fetchable", tp);
         } else {
             SubscriptionState.FetchPosition position = 
subscriptions.position(tp);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e048ab90b1c..4659f4cf0cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -487,7 +487,7 @@ public class SubscriptionState {
         List<TopicPartition> result = new ArrayList<>();
         assignment.forEach((topicPartition, topicPartitionState) -> {
             // Cheap check is first to avoid evaluating the predicate if 
possible
-            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| topicPartitionState.isFetchable())
+            if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) 
|| isFetchableAndSubscribed(topicPartition, topicPartitionState))
                     && isAvailable.test(topicPartition)) {
                 result.add(topicPartition);
             }
@@ -495,6 +495,19 @@ public class SubscriptionState {
         return result;
     }
 
+    /**
+     * Check if the partition is fetchable.
+     * If the consumer has explicitly subscribed to a list of topic names,
+     * this will also check that the partition is contained in the 
subscription.
+     */
+    private synchronized boolean isFetchableAndSubscribed(TopicPartition 
topicPartition, TopicPartitionState topicPartitionState) {
+        if (subscriptionType.equals(SubscriptionType.AUTO_TOPICS) && 
!subscription.contains(topicPartition.topic())) {
+            log.trace("Assigned partition {} is not in the subscription {} so 
will be considered not fetchable.", topicPartition, subscription);
+            return false;
+        }
+        return topicPartitionState.isFetchable();
+    }
+
     public synchronized boolean hasAutoAssignedPartitions() {
         return this.subscriptionType == SubscriptionType.AUTO_TOPICS || 
this.subscriptionType == SubscriptionType.AUTO_PATTERN
                 || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE 
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
@@ -879,8 +892,8 @@ public class SubscriptionState {
     }
 
     synchronized boolean isFetchable(TopicPartition tp) {
-        TopicPartitionState assignedOrNull = assignedStateOrNull(tp);
-        return assignedOrNull != null && assignedOrNull.isFetchable();
+        TopicPartitionState tps = assignedStateOrNull(tp);
+        return tps != null && isFetchableAndSubscribed(tp, tps);
     }
 
     public synchronized boolean hasValidPosition(TopicPartition tp) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index f0b8518d993..d77f6d808a7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -36,9 +36,12 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
@@ -113,6 +116,54 @@ public class SubscriptionStateTest {
         assertEquals(0, state.numAssignedPartitions());
     }
 
+    @Test
+    public void testIsFetchableOnManualAssignment() {
+        state.assignFromUser(Set.of(tp0, tp1));
+        assertAssignedPartitionIsFetchable();
+    }
+
+    @Test
+    public void testIsFetchableOnAutoAssignment() {
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        state.assignFromSubscribed(Set.of(tp0, tp1));
+        assertAssignedPartitionIsFetchable();
+    }
+
+    private void assertAssignedPartitionIsFetchable() {
+        assertEquals(2, state.assignedPartitions().size());
+        assertTrue(state.assignedPartitions().contains(tp0));
+        assertTrue(state.assignedPartitions().contains(tp1));
+
+        assertFalse(state.isFetchable(tp0), "Should not be fetchable without a 
valid position");
+        assertFalse(state.isFetchable(tp1), "Should not be fetchable without a 
valid position");
+
+        state.seek(tp0, 1);
+        state.seek(tp1, 1);
+
+        assertTrue(state.isFetchable(tp0));
+        assertTrue(state.isFetchable(tp1));
+    }
+
+    @Test
+    public void testIsFetchableConsidersExplicitTopicSubscription() {
+        state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
+        state.assignFromSubscribed(Set.of(t1p0));
+        state.seek(t1p0, 1);
+
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
+        assertTrue(state.isFetchable(t1p0));
+
+        // Change subscription. Assigned partitions should remain unchanged 
but not fetchable.
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
+        assertFalse(state.isFetchable(t1p0), "Assigned partitions not in the 
subscription should not be fetchable");
+
+        // Unsubscribe. Assigned partitions should be cleared and not 
fetchable.
+        state.unsubscribe();
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertFalse(state.isFetchable(t1p0));
+    }
+
     @Test
     public void testGroupSubscribe() {
         state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
@@ -1071,4 +1122,33 @@ public class SubscriptionStateTest {
 
         assertThrows(IllegalStateException.class, () -> 
state.isOffsetResetNeeded(unassignedPartition));
     }
+
+    // This test ensures the "fetchablePartitions" does not run the custom 
predicate if the partition is not fetchable
+    // This func is used in the hot path for fetching, to find fetchable 
partitions that are not in the buffer,
+    // so it should avoid evaluating the predicate if not needed.
+    @Test
+    public void testFetchablePartitionsPerformsCheapChecksFirst() {
+        // Setup fetchable partition and pause it
+        state.assignFromUser(Set.of(tp0));
+        state.seek(tp0, 100);
+        assertTrue(state.isFetchable(tp0));
+        state.pause(tp0);
+
+        // Retrieve fetchable partitions with custom predicate.
+        AtomicBoolean predicateEvaluated = new AtomicBoolean(false);
+        Predicate<TopicPartition> isBuffered = tp -> {
+            predicateEvaluated.set(true);
+            return true;
+        };
+        List<TopicPartition> fetchablePartitions = 
state.fetchablePartitions(isBuffered);
+        assertTrue(fetchablePartitions.isEmpty());
+        assertFalse(predicateEvaluated.get(), "Custom predicate should not be 
evaluated when partitions are not fetchable");
+
+        // Resume partition and retrieve fetchable again
+        state.resume(tp0);
+        predicateEvaluated.set(false);
+        fetchablePartitions = state.fetchablePartitions(isBuffered);
+        assertTrue(predicateEvaluated.get());
+        assertEquals(tp0, fetchablePartitions.get(0));
+    }
 }

Reply via email to