This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd030f34995 KAFKA-15824 SubscriptionState's 
maybeValidatePositionForCurrentLeader should handle partition which isn't 
subscribed yet (#14757)
bd030f34995 is described below

commit bd030f34995d18879d36549173752f1a92515c1b
Author: Mayank Shekhar Narula <[email protected]>
AuthorDate: Tue Nov 14 23:46:58 2023 +0000

    KAFKA-15824 SubscriptionState's maybeValidatePositionForCurrentLeader 
should handle partition which isn't subscribed yet (#14757)
    
    See the motivation in jira description 
https://issues.apache.org/jira/browse/KAFKA-15824
    
    This was discovered as ReassignReplicaShrinkTest started to fail with 
KIP-951 changes. KIP-951 changes since then have been reverted(PR), would be 
put back once this is in.
    
    Reviewers: Walker Carlson <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../kafka/clients/consumer/internals/SubscriptionState.java   | 11 ++++++++---
 .../clients/consumer/internals/SubscriptionStateTest.java     |  5 +++++
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index edac65fcd41..601726bd9df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -453,17 +453,22 @@ public class SubscriptionState {
     public synchronized boolean 
maybeValidatePositionForCurrentLeader(ApiVersions apiVersions,
                                                                       
TopicPartition tp,
                                                                       
Metadata.LeaderAndEpoch leaderAndEpoch) {
+        TopicPartitionState state = assignedStateOrNull(tp);
+        if (state == null) {
+            log.debug("Skipping validating position for partition {} which is 
not currently assigned.", tp);
+            return false;
+        }
         if (leaderAndEpoch.leader.isPresent()) {
             NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
             if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
-                return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+                return state.maybeValidatePosition(leaderAndEpoch);
             } else {
                 // If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
-                
assignedState(tp).updatePositionLeaderNoValidation(leaderAndEpoch);
+                state.updatePositionLeaderNoValidation(leaderAndEpoch);
                 return false;
             }
         } else {
-            return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+            return state.maybeValidatePosition(leaderAndEpoch);
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 7df06e3b3e9..434fbe12042 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -569,6 +569,11 @@ public class SubscriptionStateTest {
         assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, 
tp0, new Metadata.LeaderAndEpoch(
                 Optional.of(broker1), Optional.of(10))));
         assertFalse(state.hasValidPosition(tp0));
+
+        // tp1 is not part of the subscription, so validation should be 
skipped.
+        assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, 
tp1, new Metadata.LeaderAndEpoch(
+            Optional.of(broker1), Optional.of(10))));
+        assertFalse(state.assignedPartitions().contains(tp1));
     }
 
     @Test

Reply via email to