This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new be33917 KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739) be33917 is described below commit be339177f6b713121cba55a55d0efdaafbda6920 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri May 29 17:03:49 2020 +0100 KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739) Reviewers: Jason Gustafson <ja...@confluent.io> --- .../consumer/internals/SubscriptionState.java | 12 ++++++- .../internals/ConsumerCoordinatorTest.java | 42 ++++++++++++++++++++++ .../consumer/internals/SubscriptionStateTest.java | 6 ++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 89712d9..63dd441 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -306,7 +306,17 @@ public class SubscriptionState { * of the current generation; otherwise it returns the same set as {@link #subscription()} */ synchronized Set<String> metadataTopics() { - return groupSubscription.isEmpty() ? subscription : groupSubscription; + if (groupSubscription.isEmpty()) + return subscription; + else if (groupSubscription.containsAll(subscription)) + return groupSubscription; + else { + // When subscription changes `groupSubscription` may be outdated, ensure that + // new subscription topics are returned. + Set<String> topics = new HashSet<>(groupSubscription); + topics.addAll(subscription); + return topics; + } } synchronized boolean needsMetadata(String topic) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index b952354..f037711 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -522,6 +522,35 @@ public class ConsumerCoordinatorTest { } @Test + public void testMetadataTopicsDuringSubscriptionChange() { + final String consumerId = "subscription_change"; + final List<String> oldSubscription = singletonList(topic1); + final List<TopicPartition> oldAssignment = Collections.singletonList(t1p); + final List<String> newSubscription = singletonList(topic2); + final List<TopicPartition> newAssignment = Collections.singletonList(t2p); + + subscriptions.subscribe(new HashSet<>(oldSubscription), rebalanceListener); + assertEquals(new HashSet<>(oldSubscription), subscriptions.metadataTopics()); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment); + + coordinator.poll(time.timer(0)); + assertEquals(new HashSet<>(oldSubscription), subscriptions.metadataTopics()); + + subscriptions.subscribe(new HashSet<>(newSubscription), rebalanceListener); + assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics()); + + prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment); + coordinator.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.rejoinNeededOrPending()); + assertEquals(new HashSet<>(newAssignment), subscriptions.assignedPartitions()); + assertEquals(new HashSet<>(newSubscription), subscriptions.metadataTopics()); + } + + @Test public void testPatternJoinGroupLeader() { final String consumerId = "leader"; @@ -2388,6 +2417,19 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected); } + private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String> subscription, List<TopicPartition> assignment) { + partitionAssignor.prepare(singletonMap(consumerId, assignment)); + client.prepareResponse( + joinGroupLeaderResponse( + generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE)); + client.prepareResponse(body -> { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.data.memberId().equals(consumerId) && + sync.data.generationId() == generation && + sync.groupAssignments().containsKey(consumerId); + }, syncGroupResponse(assignment, Errors.NONE)); + } + private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) { final Map<TopicPartition, Errors> errors = new HashMap<>(); for (TopicPartition partition : partitions) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 35ef154..4ddc080 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -118,6 +118,12 @@ public class SubscriptionStateTest { // `groupSubscribe` does not accumulate assertFalse(state.groupSubscribe(singleton(topic1))); assertEquals(singleton(topic1), state.metadataTopics()); + + state.subscribe(singleton("anotherTopic"), rebalanceListener); + assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics()); + + assertFalse(state.groupSubscribe(singleton("anotherTopic"))); + assertEquals(singleton("anotherTopic"), state.metadataTopics()); } @Test