This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a8ad8ec639 KAFKA-19781: Fix to not update positions for partitions
being revoked (#20914)
4a8ad8ec639 is described below
commit 4a8ad8ec63953e56b4de2502b74ffa0b8c7f4bab
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Nov 19 08:53:22 2025 -0500
KAFKA-19781: Fix to not update positions for partitions being revoked
(#20914)
Fix to avoid initializing positions for partitions being revoked, as it
is unneeded (we do not allow fetching from partitions being revoked),
and could lead to NoOffsetForPartitionException on a partition that is
already being revoked (this is confusing for applications).
This is the behaviour we already had for fetch, just applying it to
update positions to align.
This gap was surfaced on edge cases of partitions being assigned and
revoked right away.
Reviewers: Andrew Schofield <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../kafka/clients/consumer/internals/SubscriptionState.java | 8 +++++---
.../clients/consumer/internals/SubscriptionStateTest.java | 11 ++++++++++-
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index c2d65a8e045..4629dd0934a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -1202,14 +1202,16 @@ public class SubscriptionState {
}
/**
- * True if the partition is in {@link FetchStates#INITIALIZING} state.
While in this
- * state, a position for the partition can be retrieved (based on
committed offsets or
+ * Check if we need to retrieve a fetch position for the given
partition.
+ * True if the partition state is {@link FetchStates#INITIALIZING},
and the partition is not being revoked.
+ * <p/>
+ * While in this state, a position for the partition will be retrieved
(based on committed offsets or
* partitions offsets).
* Note that retrieving a position does not mean that we can start
fetching from the
* partition (see {@link #isFetchable()})
*/
private boolean shouldInitialize() {
- return fetchState.equals(FetchStates.INITIALIZING);
+ return fetchState.equals(FetchStates.INITIALIZING) &&
!pendingRevocation;
}
private boolean isFetchable() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 4d4a725d45c..0e89fe4e5a1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -306,15 +306,24 @@ public class SubscriptionStateTest {
}
@Test
- public void testMarkingPartitionPending() {
+ public void testMarkingPendingRevocation() {
state.assignFromUser(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
+ assertFalse(state.isPaused(tp0));
state.markPendingRevocation(Set.of(tp0));
assertFalse(state.isFetchable(tp0));
assertFalse(state.isPaused(tp0));
}
+ @Test
+ public void testMarkingPendingRevocationPreventsInitializingPosition() {
+ state.assignFromUser(Set.of(tp0));
+ assertTrue(state.initializingPartitions().contains(tp0));
+ state.markPendingRevocation(Set.of(tp0));
+ assertFalse(state.initializingPartitions().contains(tp0));
+ }
+
@Test
public void
testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
// New partition assigned. Should not be fetchable or initializing
positions.