This is an automated email from the ASF dual-hosted git repository.
vahid pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1f7925 KAFKA-7962: Avoid NPE for StickyAssignor (#6308)
a1f7925 is described below
commit a1f7925d23be0b81cb77561d2113443df52c6f74
Author: huxi <[email protected]>
AuthorDate: Thu Feb 28 14:27:38 2019 +0800
KAFKA-7962: Avoid NPE for StickyAssignor (#6308)
* KAFKA-7962: StickyAssignor: throws NullPointerException during
assignments if topic is deleted
https://issues.apache.org/jira/browse/KAFKA-7962
Consumer using StickyAssignor throws NullPointerException if a subscribed
topic was removed.
* addressed vahidhashemian's comments
* lower NPath Complexity
* added a unit test
---
.../kafka/clients/consumer/StickyAssignor.java | 6 ++--
.../kafka/clients/consumer/StickyAssignorTest.java | 32 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 4be34c2..ee537eb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -219,13 +219,13 @@ public class StickyAssignor extends
AbstractPartitionAssignor {
for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
String consumer = entry.getKey();
consumer2AllPotentialPartitions.put(consumer, new
ArrayList<TopicPartition>());
- for (String topic: entry.getValue().topics()) {
+ entry.getValue().topics().stream().filter(topic ->
partitionsPerTopic.get(topic) != null).forEach(topic -> {
for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
TopicPartition topicPartition = new TopicPartition(topic,
i);
consumer2AllPotentialPartitions.get(consumer).add(topicPartition);
partition2AllPotentialConsumers.get(topicPartition).add(consumer);
}
- }
+ });
// add this consumer to currentAssignment (with an empty topic
partition assignment) if it does not already exist
if (!currentAssignment.containsKey(consumer))
@@ -705,6 +705,8 @@ public class StickyAssignor extends
AbstractPartitionAssignor {
*/
private <T> boolean hasIdenticalListElements(Collection<List<T>> col) {
Iterator<List<T>> it = col.iterator();
+ if (!it.hasNext())
+ return true;
List<T> cur = it.next();
while (it.hasNext()) {
List<T> next = it.next();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 31cee7e..32ba16a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -606,6 +606,38 @@ public class StickyAssignorTest {
}
}
+ @Test
+ public void testAssignmentUpdatedForDeletedTopic() {
+ String consumerId = "consumer";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put("topic01", 1);
+ partitionsPerTopic.put("topic03", 100);
+ Map<String, Subscription> subscriptions =
+ Collections.singletonMap(consumerId, new
Subscription(topics("topic01", "topic02", "topic03")));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(assignment.values().stream().mapToInt(topicPartitions ->
topicPartitions.size()).sum(), 1 + 100);
+ assertEquals(Collections.singleton(consumerId), assignment.keySet());
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() {
+ String topic = "topic01";
+ String consumer = "consumer01";
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ Map<String, Subscription> subscriptions = new HashMap<>();
+ subscriptions.put(consumer, new Subscription(topics(topic)));
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ subscriptions.put(consumer, new Subscription(topics(topic),
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
+
+ assignment = assignor.assign(Collections.emptyMap(), subscriptions);
+ assertEquals(assignment.size(), 1);
+ assertTrue(assignment.get(consumer).isEmpty());
+ }
+
private String getTopicName(int i, int maxNum) {
return getCanonicalName("t", i, maxNum);
}