This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a8ad8ec639 KAFKA-19781: Fix to not update positions for partitions 
being revoked (#20914)
4a8ad8ec639 is described below

commit 4a8ad8ec63953e56b4de2502b74ffa0b8c7f4bab
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Nov 19 08:53:22 2025 -0500

    KAFKA-19781: Fix to not update positions for partitions being revoked 
(#20914)
    
    Fix to avoid initializing positions for partitions being revoked, as it
    is unneeded (we do not allow fetching from partitions being revoked),
    and could lead to NoOffsetForPartitionException on a partition that is
    already being revoked (this is confusing for applications).
    
    This is the behaviour we already had for fetch, just applying it to
    update positions to align.
    
    This gap was surfaced on edge cases of partitions being assigned and
    revoked right away.
    
    Reviewers: Andrew Schofield <[email protected]>, Lucas Brutschy
     <[email protected]>
---
 .../kafka/clients/consumer/internals/SubscriptionState.java   |  8 +++++---
 .../clients/consumer/internals/SubscriptionStateTest.java     | 11 ++++++++++-
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index c2d65a8e045..4629dd0934a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -1202,14 +1202,16 @@ public class SubscriptionState {
         }
 
         /**
-         * True if the partition is in {@link FetchStates#INITIALIZING} state. 
While in this
-         * state, a position for the partition can be retrieved (based on 
committed offsets or
+         * Check if we need to retrieve a fetch position for the given 
partition.
+         * True if the partition state is {@link FetchStates#INITIALIZING}, 
and the partition is not being revoked.
+         * <p/>
+         * While in this state, a position for the partition will be retrieved 
(based on committed offsets or
          * partitions offsets).
          * Note that retrieving a position does not mean that we can start 
fetching from the
          * partition (see {@link #isFetchable()})
          */
         private boolean shouldInitialize() {
-            return fetchState.equals(FetchStates.INITIALIZING);
+            return fetchState.equals(FetchStates.INITIALIZING) && 
!pendingRevocation;
         }
 
         private boolean isFetchable() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 4d4a725d45c..0e89fe4e5a1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -306,15 +306,24 @@ public class SubscriptionStateTest {
     }
 
     @Test
-    public void testMarkingPartitionPending() {
+    public void testMarkingPendingRevocation() {
         state.assignFromUser(Set.of(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
+        assertFalse(state.isPaused(tp0));
         state.markPendingRevocation(Set.of(tp0));
         assertFalse(state.isFetchable(tp0));
         assertFalse(state.isPaused(tp0));
     }
 
+    @Test
+    public void testMarkingPendingRevocationPreventsInitializingPosition() {
+        state.assignFromUser(Set.of(tp0));
+        assertTrue(state.initializingPartitions().contains(tp0));
+        state.markPendingRevocation(Set.of(tp0));
+        assertFalse(state.initializingPartitions().contains(tp0));
+    }
+
     @Test
     public void 
testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
         // New partition assigned. Should not be fetchable or initializing 
positions.

Reply via email to