This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 44664cc KAFKA-9073: check assignment in requestFailed to avoid NPE (#7630) 44664cc is described below commit 44664cc4e6d016923a9d196c4359b9183894f1a5 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Dec 3 18:33:43 2019 -0800 KAFKA-9073: check assignment in requestFailed to avoid NPE (#7630) This is a cherry-pick of the bug-fix included in #6884 to 2.3 and older branch. Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Matthias J. Sax <mj...@apache.org> --- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 7 +++++-- .../kafka/clients/consumer/internals/SubscriptionState.java | 9 +++++++-- .../kafka/clients/consumer/internals/SubscriptionStateTest.java | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 7341826..876a51d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1033,8 +1033,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) { Map<String, Integer> partitionsPerTopic = new HashMap<>(); - for (String topic : subscription.groupSubscription()) - partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic)); + for (String topic : subscription.groupSubscription()) { + Integer numPartitions = cluster.partitionCountForTopic(topic); + if (numPartitions != null) + partitionsPerTopic.put(topic, numPartitions); + } this.partitionsPerTopic = partitionsPerTopic; this.version = version; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4504674..ae15f2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -618,8 +618,13 @@ public class SubscriptionState { } synchronized void requestFailed(Set<TopicPartition> partitions, long nextRetryTimeMs) { - for (TopicPartition partition : partitions) - assignedState(partition).requestFailed(nextRetryTimeMs); + for (TopicPartition partition : partitions) { + // by the time the request failed, the assignment may no longer + // contain this partition any more, in which case we would just ignore. + final TopicPartitionState state = assignedStateOrNull(partition); + if (state != null) + state.requestFailed(nextRetryTimeMs); + } } synchronized void movePartitionToEnd(TopicPartition tp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 484b9de..217b3ce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -62,7 +62,7 @@ public class SubscriptionStateTest { state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertEquals(1L, state.position(tp0).offset); - state.assignFromUser(Collections.<TopicPartition>emptySet()); + state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); assertFalse(state.isAssigned(tp0));