This is an automated email from the ASF dual-hosted git repository.
wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd030f34995 KAFKA-15824 SubscriptionState's
maybeValidatePositionForCurrentLeader should handle partition which isn't
subscribed yet (#14757)
bd030f34995 is described below
commit bd030f34995d18879d36549173752f1a92515c1b
Author: Mayank Shekhar Narula <[email protected]>
AuthorDate: Tue Nov 14 23:46:58 2023 +0000
KAFKA-15824 SubscriptionState's maybeValidatePositionForCurrentLeader
should handle partition which isn't subscribed yet (#14757)
See the motivation in jira description
https://issues.apache.org/jira/browse/KAFKA-15824
This was discovered as ReassignReplicaShrinkTest started to fail with
KIP-951 changes. KIP-951 changes since then have been reverted(PR), would be
put back once this is in.
Reviewers: Walker Carlson <[email protected]>, Andrew Schofield
<[email protected]>
---
.../kafka/clients/consumer/internals/SubscriptionState.java | 11 ++++++++---
.../clients/consumer/internals/SubscriptionStateTest.java | 5 +++++
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index edac65fcd41..601726bd9df 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -453,17 +453,22 @@ public class SubscriptionState {
public synchronized boolean
maybeValidatePositionForCurrentLeader(ApiVersions apiVersions,
TopicPartition tp,
Metadata.LeaderAndEpoch leaderAndEpoch) {
+ TopicPartitionState state = assignedStateOrNull(tp);
+ if (state == null) {
+ log.debug("Skipping validating position for partition {} which is
not currently assigned.", tp);
+ return false;
+ }
if (leaderAndEpoch.leader.isPresent()) {
NodeApiVersions nodeApiVersions =
apiVersions.get(leaderAndEpoch.leader.get().idString());
if (nodeApiVersions == null ||
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
- return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+ return state.maybeValidatePosition(leaderAndEpoch);
} else {
// If the broker does not support a newer version of
OffsetsForLeaderEpoch, we skip validation
-
assignedState(tp).updatePositionLeaderNoValidation(leaderAndEpoch);
+ state.updatePositionLeaderNoValidation(leaderAndEpoch);
return false;
}
} else {
- return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+ return state.maybeValidatePosition(leaderAndEpoch);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 7df06e3b3e9..434fbe12042 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -569,6 +569,11 @@ public class SubscriptionStateTest {
assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions,
tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(10))));
assertFalse(state.hasValidPosition(tp0));
+
+ // tp1 is not part of the subscription, so validation should be
skipped.
+ assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions,
tp1, new Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.of(10))));
+ assertFalse(state.assignedPartitions().contains(tp1));
}
@Test