This is an automated email from the ASF dual-hosted git repository.

vahid pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1f7925  KAFKA-7962: Avoid NPE for StickyAssignor (#6308)
a1f7925 is described below

commit a1f7925d23be0b81cb77561d2113443df52c6f74
Author: huxi <[email protected]>
AuthorDate: Thu Feb 28 14:27:38 2019 +0800

    KAFKA-7962: Avoid NPE for StickyAssignor (#6308)
    
    * KAFKA-7962: StickyAssignor: throws NullPointerException during 
assignments if topic is deleted
    
    https://issues.apache.org/jira/browse/KAFKA-7962
    
    Consumer using StickyAssignor throws NullPointerException if a subscribed 
topic was removed.
    
    * addressed vahidhashemian's comments
    
    * lower NPath Complexity
    
    * added a unit test
---
 .../kafka/clients/consumer/StickyAssignor.java     |  6 ++--
 .../kafka/clients/consumer/StickyAssignorTest.java | 32 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 4be34c2..ee537eb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -219,13 +219,13 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
             String consumer = entry.getKey();
             consumer2AllPotentialPartitions.put(consumer, new 
ArrayList<TopicPartition>());
-            for (String topic: entry.getValue().topics()) {
+            entry.getValue().topics().stream().filter(topic -> 
partitionsPerTopic.get(topic) != null).forEach(topic -> {
                 for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
                     TopicPartition topicPartition = new TopicPartition(topic, 
i);
                     
consumer2AllPotentialPartitions.get(consumer).add(topicPartition);
                     
partition2AllPotentialConsumers.get(topicPartition).add(consumer);
                 }
-            }
+            });
 
             // add this consumer to currentAssignment (with an empty topic 
partition assignment) if it does not already exist
             if (!currentAssignment.containsKey(consumer))
@@ -705,6 +705,8 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
      */
     private <T> boolean hasIdenticalListElements(Collection<List<T>> col) {
         Iterator<List<T>> it = col.iterator();
+        if (!it.hasNext())
+            return true;
         List<T> cur = it.next();
         while (it.hasNext()) {
             List<T> next = it.next();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 31cee7e..32ba16a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -606,6 +606,38 @@ public class StickyAssignorTest {
         }
     }
 
+    @Test
+    public void testAssignmentUpdatedForDeletedTopic() {
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put("topic01", 1);
+        partitionsPerTopic.put("topic03", 100);
+        Map<String, Subscription> subscriptions =
+                Collections.singletonMap(consumerId, new 
Subscription(topics("topic01", "topic02", "topic03")));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(assignment.values().stream().mapToInt(topicPartitions -> 
topicPartitions.size()).sum(), 1 + 100);
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() {
+        String topic = "topic01";
+        String consumer = "consumer01";
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer, new Subscription(topics(topic)));
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        subscriptions.put(consumer, new Subscription(topics(topic), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
+
+        assignment = assignor.assign(Collections.emptyMap(), subscriptions);
+        assertEquals(assignment.size(), 1);
+        assertTrue(assignment.get(consumer).isEmpty());
+    }
+
     private String getTopicName(int i, int maxNum) {
         return getCanonicalName("t", i, maxNum);
     }

Reply via email to