This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2bfe7a9  KAFKA-12514: Fix NPE in SubscriptionState (#10369)
2bfe7a9 is described below

commit 2bfe7a9d1ec7a39bb6cb290ed91644f6ad14cb3f
Author: John Roesler <[email protected]>
AuthorDate: Sun Mar 21 20:56:16 2021 -0500

    KAFKA-12514: Fix NPE in SubscriptionState (#10369)
    
    Return null for partitionLag if there is no current position.
    This was the desired semantics, the lack of the check was an
    oversight.
    
    Patches: KIP-695
    Patches: a92b986c855592d630fbabf49d1e9a160ad5b230
    
    Reviewers: Walker Carlson <[email protected]>, A. Sophie Blee-Goldman 
<[email protected]>
---
 .../clients/consumer/internals/SubscriptionState.java     |  7 +++++--
 .../clients/consumer/internals/SubscriptionStateTest.java | 15 +++++++++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 7b971a1..3e53868 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -539,10 +539,13 @@ public class SubscriptionState {
 
     public synchronized Long partitionLag(TopicPartition tp, IsolationLevel 
isolationLevel) {
         TopicPartitionState topicPartitionState = assignedState(tp);
-        if (isolationLevel == IsolationLevel.READ_COMMITTED)
+        if (topicPartitionState.position == null) {
+            return null;
+        } else if (isolationLevel == IsolationLevel.READ_COMMITTED) {
             return topicPartitionState.lastStableOffset == null ? null : 
topicPartitionState.lastStableOffset - topicPartitionState.position.offset;
-        else
+        } else {
             return topicPartitionState.highWatermark == null ? null : 
topicPartitionState.highWatermark - topicPartitionState.position.offset;
+        }
     }
 
     synchronized Long partitionLead(TopicPartition tp) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index d6e8800..d19234f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
@@ -794,4 +795,18 @@ public class SubscriptionStateTest {
         assertFalse(state.isOffsetResetNeeded(tp0));
     }
 
+    @Test
+    public void nullPositionLagOnNoPosition() {
+        state.assignFromUser(Collections.singleton(tp0));
+
+        assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
+        assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
+
+        state.updateHighWatermark(tp0, 1L);
+        state.updateLastStableOffset(tp0, 1L);
+
+        assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
+        assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
+    }
+
 }

Reply via email to