This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 44664cc  KAFKA-9073: check assignment in requestFailed to avoid NPE 
(#7630)
44664cc is described below

commit 44664cc4e6d016923a9d196c4359b9183894f1a5
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Dec 3 18:33:43 2019 -0800

    KAFKA-9073: check assignment in requestFailed to avoid NPE (#7630)
    
    This is a cherry-pick of the bug-fix included in #6884 to 2.3 and older 
branch.
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Matthias J. Sax 
<mj...@apache.org>
---
 .../kafka/clients/consumer/internals/ConsumerCoordinator.java    | 7 +++++--
 .../kafka/clients/consumer/internals/SubscriptionState.java      | 9 +++++++--
 .../kafka/clients/consumer/internals/SubscriptionStateTest.java  | 2 +-
 3 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7341826..876a51d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1033,8 +1033,11 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         private MetadataSnapshot(SubscriptionState subscription, Cluster 
cluster, int version) {
             Map<String, Integer> partitionsPerTopic = new HashMap<>();
-            for (String topic : subscription.groupSubscription())
-                partitionsPerTopic.put(topic, 
cluster.partitionCountForTopic(topic));
+            for (String topic : subscription.groupSubscription()) {
+                Integer numPartitions = cluster.partitionCountForTopic(topic);
+                if (numPartitions != null)
+                    partitionsPerTopic.put(topic, numPartitions);
+            }
             this.partitionsPerTopic = partitionsPerTopic;
             this.version = version;
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4504674..ae15f2f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -618,8 +618,13 @@ public class SubscriptionState {
     }
 
     synchronized void requestFailed(Set<TopicPartition> partitions, long 
nextRetryTimeMs) {
-        for (TopicPartition partition : partitions)
-            assignedState(partition).requestFailed(nextRetryTimeMs);
+        for (TopicPartition partition : partitions) {
+            // by the time the request failed, the assignment may no longer
+            // contain this partition any more, in which case we would just 
ignore.
+            final TopicPartitionState state = assignedStateOrNull(partition);
+            if (state != null)
+                state.requestFailed(nextRetryTimeMs);
+        }
     }
 
     synchronized void movePartitionToEnd(TopicPartition tp) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 484b9de..217b3ce 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -62,7 +62,7 @@ public class SubscriptionStateTest {
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertEquals(1L, state.position(tp0).offset);
-        state.assignFromUser(Collections.<TopicPartition>emptySet());
+        state.assignFromUser(Collections.emptySet());
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
         assertFalse(state.isAssigned(tp0));

Reply via email to