This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new eb8cc09 KAFKA-7961; Ignore assignment for un-subscribed partitions (#6304) eb8cc09 is described below commit eb8cc09625c9913c4029763a46d4b7e5639c85f5 Author: José Armando García Sancio <jsan...@users.noreply.github.com> AuthorDate: Fri Feb 22 20:51:09 2019 -0800 KAFKA-7961; Ignore assignment for un-subscribed partitions (#6304) Whenever the consumer coordinator sends a response that doesn't match the client consumer subscription, we should check the subscription to see if it has changed. If it has, we can ignore the assignment and request a rebalance. Otherwise, we can throw an exception as before. Testing strategy: create a mocked client that first sends an assignment response that doesn't match the client subscription followed by an assignment response that does match the client subscription. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/clients/consumer/MockConsumer.java | 3 +- .../consumer/internals/ConsumerCoordinator.java | 15 +++- .../consumer/internals/SubscriptionState.java | 53 ++++++++---- .../kafka/clients/consumer/KafkaConsumerTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 98 +++++++++++++++++++++- .../clients/consumer/internals/FetcherTest.java | 6 +- .../consumer/internals/SubscriptionStateTest.java | 31 +++---- 8 files changed, 171 insertions(+), 39 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 29fda34..7f5b2c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -754,7 +754,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); - this.subscriptions = new SubscriptionState(offsetResetStrategy); + this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy); this.assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index ce6a60b..614ec9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.LogContext; import java.time.Duration; import java.util.ArrayList; @@ -63,7 +64,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { private boolean closed; public MockConsumer(OffsetResetStrategy offsetResetStrategy) { - this.subscriptions = new SubscriptionState(offsetResetStrategy); + this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy); this.partitions = new HashMap<>(); this.records = new HashMap<>(); this.paused = new HashSet<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4ee7519..2fb6fb6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -64,6 +64,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * This class manages the coordination process with the consumer coordinator. @@ -171,7 +172,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } @Override - public List<ProtocolMetadata> metadata() { + protected List<ProtocolMetadata> metadata() { this.joinedSubscription = subscriptions.subscription(); List<ProtocolMetadata> metadataList = new ArrayList<>(); for (PartitionAssignor assignor : assignors) { @@ -247,7 +248,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); - subscriptions.assignFromSubscribed(assignment.partitions()); + if (!subscriptions.assignFromSubscribed(assignment.partitions())) { + // was sent assignments that didn't match the original subscription + Set<TopicPartition> invalidAssignments = assignment.partitions().stream().filter(topicPartition -> + !joinedSubscription.contains(topicPartition.topic())).collect(Collectors.toSet()); + if (invalidAssignments.size() > 0) { + throw new IllegalStateException("Coordinator leader sent assignment that don't correspond to subscription request: " + invalidAssignments); + } + + requestRejoin(); + return; + } // check if the assignment contains some topics that were not in the original // subscription, if yes we will obey what leader has decided and add these topics diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 45712b0..3298980 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.ArrayList; import java.util.Collection; @@ -57,6 +59,8 @@ public class SubscriptionState { private static final String SUBSCRIPTION_EXCEPTION_MESSAGE = "Subscription to topics, partitions and pattern are mutually exclusive"; + private final Logger log; + private enum SubscriptionType { NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED } @@ -85,7 +89,8 @@ public class SubscriptionState { /* User-provided listener to be invoked when assignment changes */ private ConsumerRebalanceListener rebalanceListener; - public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { + public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy) { + this.log = logContext.logger(this.getClass()); this.defaultResetStrategy = defaultResetStrategy; this.subscription = Collections.emptySet(); this.assignment = new PartitionStates<>(); @@ -174,28 +179,44 @@ public class SubscriptionState { } /** - * Change the assignment to the specified partitions returned from the coordinator, - * note this is different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs + * Change the assignment to the specified partitions returned from the coordinator, note this is + * different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs. + * + * @return true if assignments matches subscription, otherwise false */ - public void assignFromSubscribed(Collection<TopicPartition> assignments) { + public boolean assignFromSubscribed(Collection<TopicPartition> assignments) { if (!this.partitionsAutoAssigned()) throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use"); - Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments); - fireOnAssignment(assignedPartitionStates.keySet()); - - if (this.subscribedPattern != null) { - for (TopicPartition tp : assignments) { - if (!this.subscribedPattern.matcher(tp.topic()).matches()) - throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic regex pattern; subscription pattern is " + this.subscribedPattern); + Predicate<TopicPartition> predicate = topicPartition -> { + if (this.subscribedPattern != null) { + boolean match = this.subscribedPattern.matcher(topicPartition.topic()).matches(); + if (!match) { + log.info("Assigned partition {} for non-subscribed topic regex pattern; subscription pattern is {}", + topicPartition, + this.subscribedPattern); + } + return match; + } else { + boolean match = this.subscription.contains(topicPartition.topic()); + if (!match) { + log.info("Assigned partition {} for non-subscribed topic; subscription is {}", topicPartition, this.subscription); + } + return match; } - } else { - for (TopicPartition tp : assignments) - if (!this.subscription.contains(tp.topic())) - throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic; subscription is " + this.subscription); + }; + + boolean assignmentMatchedSubscription = assignments.stream().allMatch(predicate); + + if (assignmentMatchedSubscription) { + Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap( + assignments); + fireOnAssignment(assignedPartitionStates.keySet()); + + this.assignment.set(assignedPartitionStates); } - this.assignment.set(assignedPartitionStates); + return assignmentMatchedSubscription; } public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 138d206..61702a1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1801,8 +1801,8 @@ public class KafkaConsumerTest { Metrics metrics = new Metrics(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix); - SubscriptionState subscriptions = new SubscriptionState(resetStrategy); LogContext loggerFactory = new LogContext(); + SubscriptionState subscriptions = new SubscriptionState(loggerFactory, resetStrategy); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index c793556..290b428 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -93,6 +93,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -132,11 +133,12 @@ public class ConsumerCoordinatorTest { @Before public void setup() { - this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); + LogContext logContext = new LogContext(); + this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.client = new MockClient(time, metadata); this.client.updateMetadata(metadataResponse); - this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, + this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, requestTimeoutMs, Integer.MAX_VALUE); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); @@ -415,6 +417,98 @@ public class ConsumerCoordinatorTest { } @Test + public void testOutdatedCoordinatorAssignment() { + final String consumerId = "outdated_assignment"; + + subscriptions.subscribe(singleton(topic2), rebalanceListener); + + // ensure metadata is up-to-date for leader + metadata.setTopics(Arrays.asList(topic1, topic2)); + client.updateMetadata(metadataResponse); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Test coordinator returning unsubscribed partitions + partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); + + // First incorrect assignment for subscription + client.prepareResponse( + joinGroupLeaderResponse( + 1, consumerId, singletonMap(consumerId, singletonList(topic2)), Errors.NONE)); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.memberId().equals(consumerId) && + sync.generationId() == 1 && + sync.groupAssignment().containsKey(consumerId); + } + }, syncGroupResponse(Arrays.asList(t2p), Errors.NONE)); + + // Second correct assignment for subscription + client.prepareResponse( + joinGroupLeaderResponse( + 1, consumerId, singletonMap(consumerId, singletonList(topic1)), Errors.NONE)); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.memberId().equals(consumerId) && + sync.generationId() == 1 && + sync.groupAssignment().containsKey(consumerId); + } + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); + + // Poll once so that the join group future gets created and complete + coordinator.poll(time.timer(0)); + + // Before the sync group response gets completed change the subscription + subscriptions.subscribe(singleton(topic1), rebalanceListener); + coordinator.poll(time.timer(0)); + + coordinator.poll(time.timer(Long.MAX_VALUE)); + + assertFalse(coordinator.rejoinNeededOrPending()); + assertEquals(singleton(t1p), subscriptions.assignedPartitions()); + assertEquals(singleton(topic1), subscriptions.groupSubscription()); + assertEquals(2, rebalanceListener.revokedCount); + assertEquals(Collections.emptySet(), rebalanceListener.revoked); + assertEquals(1, rebalanceListener.assignedCount); + assertEquals(singleton(t1p), rebalanceListener.assigned); + } + + @Test + public void testInvalidCoordinatorAssignment() { + final String consumerId = "invalid_assignment"; + + subscriptions.subscribe(singleton(topic1), rebalanceListener); + + // ensure metadata is up-to-date for leader + metadata.setTopics(singletonList(topic1)); + client.updateMetadata(metadataResponse); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // normal join group + Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic2)); + partitionAssignor.prepare(singletonMap(consumerId, singletonList(t2p))); + + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.memberId().equals(consumerId) && + sync.generationId() == 1 && + sync.groupAssignment().containsKey(consumerId); + } + }, syncGroupResponse(singletonList(t2p), Errors.NONE)); + assertThrows(IllegalStateException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE))); + } + + @Test public void testPatternJoinGroupLeader() { final String consumerId = "leader"; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d862944..b08df1d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -148,8 +148,10 @@ public class FetcherTest { private Metrics metrics = new Metrics(time); private FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId); - private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); - private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE); + private SubscriptionState subscriptions = new SubscriptionState( + new LogContext(), OffsetResetStrategy.EARLIEST); + private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState( + new LogContext(), OffsetResetStrategy.NONE); private static final double EPSILON = 0.0001; private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, 1000, Integer.MAX_VALUE); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 05287e0..8f8e960 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.junit.Test; @@ -37,7 +38,9 @@ import static org.junit.Assert.assertTrue; public class SubscriptionStateTest { - private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST); + private final SubscriptionState state = new SubscriptionState( + new LogContext(), + OffsetResetStrategy.EARLIEST); private final String topic = "test"; private final String topic1 = "test1"; private final TopicPartition tp0 = new TopicPartition(topic, 0); @@ -80,7 +83,7 @@ public class SubscriptionStateTest { assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); - state.assignFromSubscribed(singleton(t1p0)); + assertTrue(state.assignFromSubscribed(singleton(t1p0))); // assigned partitions should immediately change assertEquals(singleton(t1p0), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions()); @@ -108,13 +111,13 @@ public class SubscriptionStateTest { assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); - state.assignFromSubscribed(singleton(tp1)); + assertTrue(state.assignFromSubscribed(singleton(tp1))); // assigned partitions should immediately change assertEquals(singleton(tp1), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions()); assertEquals(singleton(topic), state.subscription()); - state.assignFromSubscribed(Collections.singletonList(t1p0)); + assertTrue(state.assignFromSubscribed(Collections.singletonList(t1p0))); // assigned partitions should immediately change assertEquals(singleton(t1p0), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions()); @@ -130,7 +133,7 @@ public class SubscriptionStateTest { assertEquals(singleton(t1p0), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions()); - state.assignFromSubscribed(Collections.singletonList(tp0)); + assertTrue(state.assignFromSubscribed(Collections.singletonList(tp0))); // assigned partitions should immediately change assertEquals(singleton(tp0), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions()); @@ -160,7 +163,7 @@ public class SubscriptionStateTest { Set<TopicPartition> autoAssignment = Utils.mkSet(t1p0); state.subscribe(singleton(topic1), rebalanceListener); - state.assignFromSubscribed(autoAssignment); + assertTrue(state.assignFromSubscribed(autoAssignment)); assertEquals(autoAssignment, assignmentRef.get()); } @@ -187,10 +190,10 @@ public class SubscriptionStateTest { assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); assertTrue(state.partitionsAutoAssigned()); - state.assignFromSubscribed(singleton(tp0)); + assertTrue(state.assignFromSubscribed(singleton(tp0))); state.seek(tp0, 1); assertEquals(1L, state.position(tp0).longValue()); - state.assignFromSubscribed(singleton(tp1)); + assertTrue(state.assignFromSubscribed(singleton(tp1))); assertTrue(state.isAssigned(tp1)); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp1)); @@ -212,21 +215,21 @@ public class SubscriptionStateTest { @Test(expected = IllegalStateException.class) public void invalidPositionUpdate() { state.subscribe(singleton(topic), rebalanceListener); - state.assignFromSubscribed(singleton(tp0)); + assertTrue(state.assignFromSubscribed(singleton(tp0))); state.position(tp0, 0); } - @Test(expected = IllegalArgumentException.class) + @Test public void cantAssignPartitionForUnsubscribedTopics() { state.subscribe(singleton(topic), rebalanceListener); - state.assignFromSubscribed(Collections.singletonList(t1p0)); + assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0))); } - @Test(expected = IllegalArgumentException.class) + @Test public void cantAssignPartitionForUnmatchedPattern() { state.subscribe(Pattern.compile(".*t"), rebalanceListener); state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); - state.assignFromSubscribed(Collections.singletonList(t1p0)); + assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0))); } @Test(expected = IllegalStateException.class) @@ -286,7 +289,7 @@ public class SubscriptionStateTest { public void unsubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1))); - state.assignFromSubscribed(singleton(tp1)); + assertTrue(state.assignFromSubscribed(singleton(tp1))); assertEquals(singleton(tp1), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions());