This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2bfe7a9 KAFKA-12514: Fix NPE in SubscriptionState (#10369)
2bfe7a9 is described below
commit 2bfe7a9d1ec7a39bb6cb290ed91644f6ad14cb3f
Author: John Roesler <[email protected]>
AuthorDate: Sun Mar 21 20:56:16 2021 -0500
KAFKA-12514: Fix NPE in SubscriptionState (#10369)
Return null for partitionLag if there is no current position.
This was the desired semantics, the lack of the check was an
oversight.
Patches: KIP-695
Patches: a92b986c855592d630fbabf49d1e9a160ad5b230
Reviewers: Walker Carlson <[email protected]>, A. Sophie Blee-Goldman
<[email protected]>
---
.../clients/consumer/internals/SubscriptionState.java | 7 +++++--
.../clients/consumer/internals/SubscriptionStateTest.java | 15 +++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 7b971a1..3e53868 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -539,10 +539,13 @@ public class SubscriptionState {
public synchronized Long partitionLag(TopicPartition tp, IsolationLevel
isolationLevel) {
TopicPartitionState topicPartitionState = assignedState(tp);
- if (isolationLevel == IsolationLevel.READ_COMMITTED)
+ if (topicPartitionState.position == null) {
+ return null;
+ } else if (isolationLevel == IsolationLevel.READ_COMMITTED) {
return topicPartitionState.lastStableOffset == null ? null :
topicPartitionState.lastStableOffset - topicPartitionState.position.offset;
- else
+ } else {
return topicPartitionState.highWatermark == null ? null :
topicPartitionState.highWatermark - topicPartitionState.position.offset;
+ }
}
synchronized Long partitionLead(TopicPartition tp) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index d6e8800..d19234f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import
org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
@@ -794,4 +795,18 @@ public class SubscriptionStateTest {
assertFalse(state.isOffsetResetNeeded(tp0));
}
+ @Test
+ public void nullPositionLagOnNoPosition() {
+ state.assignFromUser(Collections.singleton(tp0));
+
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
+
+ state.updateHighWatermark(tp0, 1L);
+ state.updateLastStableOffset(tp0, 1L);
+
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
+ }
+
}