This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb8cc09  KAFKA-7961; Ignore assignment for un-subscribed partitions 
(#6304)
eb8cc09 is described below

commit eb8cc09625c9913c4029763a46d4b7e5639c85f5
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Fri Feb 22 20:51:09 2019 -0800

    KAFKA-7961; Ignore assignment for un-subscribed partitions (#6304)
    
    Whenever the consumer coordinator sends a response that doesn't match the 
client consumer subscription, we should check the subscription to see if it has 
changed. If it has, we can ignore the assignment and request a rebalance. 
Otherwise, we can throw an exception as before.
    
    Testing strategy: create a mocked client that first sends an assignment 
response that doesn't match the client subscription followed by an assignment 
response that does match the client subscription.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../kafka/clients/consumer/MockConsumer.java       |  3 +-
 .../consumer/internals/ConsumerCoordinator.java    | 15 +++-
 .../consumer/internals/SubscriptionState.java      | 53 ++++++++----
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../internals/ConsumerCoordinatorTest.java         | 98 +++++++++++++++++++++-
 .../clients/consumer/internals/FetcherTest.java    |  6 +-
 .../consumer/internals/SubscriptionStateTest.java  | 31 +++----
 8 files changed, 171 insertions(+), 39 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 29fda34..7f5b2c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -754,7 +754,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     heartbeatIntervalMs); //Will avoid blocking an extended 
period of time to prevent heartbeat thread starvation
             OffsetResetStrategy offsetResetStrategy = 
OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
-            this.subscriptions = new SubscriptionState(offsetResetStrategy);
+            this.subscriptions = new SubscriptionState(logContext, 
offsetResetStrategy);
             this.assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ce6a60b..614ec9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -63,7 +64,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private boolean closed;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
-        this.subscriptions = new SubscriptionState(offsetResetStrategy);
+        this.subscriptions = new SubscriptionState(new LogContext(), 
offsetResetStrategy);
         this.partitions = new HashMap<>();
         this.records = new HashMap<>();
         this.paused = new HashSet<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4ee7519..2fb6fb6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -64,6 +64,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * This class manages the coordination process with the consumer coordinator.
@@ -171,7 +172,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     }
 
     @Override
-    public List<ProtocolMetadata> metadata() {
+    protected List<ProtocolMetadata> metadata() {
         this.joinedSubscription = subscriptions.subscription();
         List<ProtocolMetadata> metadataList = new ArrayList<>();
         for (PartitionAssignor assignor : assignors) {
@@ -247,7 +248,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             throw new IllegalStateException("Coordinator selected invalid 
assignment protocol: " + assignmentStrategy);
 
         Assignment assignment = 
ConsumerProtocol.deserializeAssignment(assignmentBuffer);
-        subscriptions.assignFromSubscribed(assignment.partitions());
+        if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
+            // was sent assignments that didn't match the original subscription
+            Set<TopicPartition> invalidAssignments = 
assignment.partitions().stream().filter(topicPartition -> 
+                
!joinedSubscription.contains(topicPartition.topic())).collect(Collectors.toSet());
+            if (invalidAssignments.size() > 0) {
+                throw new IllegalStateException("Coordinator leader sent 
assignment that don't correspond to subscription request: " + 
invalidAssignments);
+            }
+
+            requestRejoin();
+            return;
+        }
 
         // check if the assignment contains some topics that were not in the 
original
         // subscription, if yes we will obey what leader has decided and add 
these topics
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 45712b0..3298980 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.PartitionStates;
 import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,6 +59,8 @@ public class SubscriptionState {
     private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
             "Subscription to topics, partitions and pattern are mutually 
exclusive";
 
+    private final Logger log;
+
     private enum SubscriptionType {
         NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
     }
@@ -85,7 +89,8 @@ public class SubscriptionState {
     /* User-provided listener to be invoked when assignment changes */
     private ConsumerRebalanceListener rebalanceListener;
 
-    public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
+    public SubscriptionState(LogContext logContext, OffsetResetStrategy 
defaultResetStrategy) {
+        this.log = logContext.logger(this.getClass());
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
         this.assignment = new PartitionStates<>();
@@ -174,28 +179,44 @@ public class SubscriptionState {
     }
 
     /**
-     * Change the assignment to the specified partitions returned from the 
coordinator,
-     * note this is different from {@link #assignFromUser(Set)} which directly 
set the assignment from user inputs
+     * Change the assignment to the specified partitions returned from the 
coordinator, note this is
+     * different from {@link #assignFromUser(Set)} which directly set the 
assignment from user inputs.
+     *
+     * @return true if assignments matches subscription, otherwise false
      */
-    public void assignFromSubscribed(Collection<TopicPartition> assignments) {
+    public boolean assignFromSubscribed(Collection<TopicPartition> 
assignments) {
         if (!this.partitionsAutoAssigned())
             throw new IllegalArgumentException("Attempt to dynamically assign 
partitions while manual assignment in use");
 
-        Map<TopicPartition, TopicPartitionState> assignedPartitionStates = 
partitionToStateMap(assignments);
-        fireOnAssignment(assignedPartitionStates.keySet());
-
-        if (this.subscribedPattern != null) {
-            for (TopicPartition tp : assignments) {
-                if (!this.subscribedPattern.matcher(tp.topic()).matches())
-                    throw new IllegalArgumentException("Assigned partition " + 
tp + " for non-subscribed topic regex pattern; subscription pattern is " + 
this.subscribedPattern);
+        Predicate<TopicPartition> predicate = topicPartition -> {
+            if (this.subscribedPattern != null) {
+                boolean match = 
this.subscribedPattern.matcher(topicPartition.topic()).matches();
+                if (!match) {
+                    log.info("Assigned partition {} for non-subscribed topic 
regex pattern; subscription pattern is {}",
+                            topicPartition,
+                            this.subscribedPattern);
+                }
+                return match;
+            } else {
+                boolean match = 
this.subscription.contains(topicPartition.topic());
+                if (!match) {
+                    log.info("Assigned partition {} for non-subscribed topic; 
subscription is {}", topicPartition, this.subscription);
+                }
+                return match;
             }
-        } else {
-            for (TopicPartition tp : assignments)
-                if (!this.subscription.contains(tp.topic()))
-                    throw new IllegalArgumentException("Assigned partition " + 
tp + " for non-subscribed topic; subscription is " + this.subscription);
+        };
+
+        boolean assignmentMatchedSubscription = 
assignments.stream().allMatch(predicate);
+
+        if (assignmentMatchedSubscription) {
+            Map<TopicPartition, TopicPartitionState> assignedPartitionStates = 
partitionToStateMap(
+                    assignments);
+            fireOnAssignment(assignedPartitionStates.keySet());
+
+            this.assignment.set(assignedPartitionStates);
         }
 
-        this.assignment.set(assignedPartitionStates);
+        return assignmentMatchedSubscription;
     }
 
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 138d206..61702a1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1801,8 +1801,8 @@ public class KafkaConsumerTest {
         Metrics metrics = new Metrics();
         ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(metricGroupPrefix);
 
-        SubscriptionState subscriptions = new SubscriptionState(resetStrategy);
         LogContext loggerFactory = new LogContext();
+        SubscriptionState subscriptions = new SubscriptionState(loggerFactory, 
resetStrategy);
         ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(loggerFactory, client, metadata, time,
                 retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c793556..290b428 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -93,6 +93,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -132,11 +133,12 @@ public class ConsumerCoordinatorTest {
 
     @Before
     public void setup() {
-        this.subscriptions = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
+        LogContext logContext = new LogContext();
+        this.subscriptions = new SubscriptionState(logContext, 
OffsetResetStrategy.EARLIEST);
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
         this.client = new MockClient(time, metadata);
         this.client.updateMetadata(metadataResponse);
-        this.consumerClient = new ConsumerNetworkClient(new LogContext(), 
client, metadata, time, 100,
+        this.consumerClient = new ConsumerNetworkClient(logContext, client, 
metadata, time, 100,
                 requestTimeoutMs, Integer.MAX_VALUE);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
@@ -415,6 +417,98 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testOutdatedCoordinatorAssignment() {
+        final String consumerId = "outdated_assignment";
+
+        subscriptions.subscribe(singleton(topic2), rebalanceListener);
+
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(Arrays.asList(topic1, topic2));
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Test coordinator returning unsubscribed partitions
+        partitionAssignor.prepare(singletonMap(consumerId, 
singletonList(t1p)));
+
+        // First incorrect assignment for subscription
+        client.prepareResponse(
+                joinGroupLeaderResponse(
+                    1, consumerId, singletonMap(consumerId, 
singletonList(topic2)), Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(Arrays.asList(t2p), Errors.NONE));
+
+        // Second correct assignment for subscription
+        client.prepareResponse(
+                joinGroupLeaderResponse(
+                    1, consumerId, singletonMap(consumerId, 
singletonList(topic1)), Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        // Poll once so that the join group future gets created and complete
+        coordinator.poll(time.timer(0));
+
+        // Before the sync group response gets completed change the 
subscription
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        coordinator.poll(time.timer(0));
+
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(singleton(t1p), subscriptions.assignedPartitions());
+        assertEquals(singleton(topic1), subscriptions.groupSubscription());
+        assertEquals(2, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(singleton(t1p), rebalanceListener.assigned);
+    }
+
+    @Test
+    public void testInvalidCoordinatorAssignment() {
+        final String consumerId = "invalid_assignment";
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(singletonList(topic1));
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // normal join group
+        Map<String, List<String>> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic2));
+        partitionAssignor.prepare(singletonMap(consumerId, 
singletonList(t2p)));
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberSubscriptions, Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(singletonList(t2p), Errors.NONE));
+        assertThrows(IllegalStateException.class, () -> 
coordinator.poll(time.timer(Long.MAX_VALUE)));
+    }
+
+    @Test
     public void testPatternJoinGroupLeader() {
         final String consumerId = "leader";
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d862944..b08df1d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -148,8 +148,10 @@ public class FetcherTest {
     private Metrics metrics = new Metrics(time);
     private FetcherMetricsRegistry metricsRegistry = new 
FetcherMetricsRegistry("consumer" + groupId);
 
-    private SubscriptionState subscriptions = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
-    private SubscriptionState subscriptionsNoAutoReset = new 
SubscriptionState(OffsetResetStrategy.NONE);
+    private SubscriptionState subscriptions = new SubscriptionState(
+            new LogContext(), OffsetResetStrategy.EARLIEST);
+    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(
+            new LogContext(), OffsetResetStrategy.NONE);
     private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(new LogContext(),
             client, metadata, time, 100, 1000, Integer.MAX_VALUE);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 05287e0..8f8e960 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
@@ -37,7 +38,9 @@ import static org.junit.Assert.assertTrue;
 
 public class SubscriptionStateTest {
 
-    private final SubscriptionState state = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
+    private final SubscriptionState state = new SubscriptionState(
+            new LogContext(),
+            OffsetResetStrategy.EARLIEST);
     private final String topic = "test";
     private final String topic1 = "test1";
     private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -80,7 +83,7 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.assignFromSubscribed(singleton(t1p0));
+        assertTrue(state.assignFromSubscribed(singleton(t1p0)));
         // assigned partitions should immediately change
         assertEquals(singleton(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
@@ -108,13 +111,13 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.assignFromSubscribed(singleton(tp1));
+        assertTrue(state.assignFromSubscribed(singleton(tp1)));
         // assigned partitions should immediately change
         assertEquals(singleton(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
         assertEquals(singleton(topic), state.subscription());
 
-        state.assignFromSubscribed(Collections.singletonList(t1p0));
+        
assertTrue(state.assignFromSubscribed(Collections.singletonList(t1p0)));
         // assigned partitions should immediately change
         assertEquals(singleton(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
@@ -130,7 +133,7 @@ public class SubscriptionStateTest {
         assertEquals(singleton(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
-        state.assignFromSubscribed(Collections.singletonList(tp0));
+        assertTrue(state.assignFromSubscribed(Collections.singletonList(tp0)));
         // assigned partitions should immediately change
         assertEquals(singleton(tp0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
@@ -160,7 +163,7 @@ public class SubscriptionStateTest {
 
         Set<TopicPartition> autoAssignment = Utils.mkSet(t1p0);
         state.subscribe(singleton(topic1), rebalanceListener);
-        state.assignFromSubscribed(autoAssignment);
+        assertTrue(state.assignFromSubscribed(autoAssignment));
         assertEquals(autoAssignment, assignmentRef.get());
     }
 
@@ -187,10 +190,10 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
         assertTrue(state.partitionsAutoAssigned());
-        state.assignFromSubscribed(singleton(tp0));
+        assertTrue(state.assignFromSubscribed(singleton(tp0)));
         state.seek(tp0, 1);
         assertEquals(1L, state.position(tp0).longValue());
-        state.assignFromSubscribed(singleton(tp1));
+        assertTrue(state.assignFromSubscribed(singleton(tp1)));
         assertTrue(state.isAssigned(tp1));
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp1));
@@ -212,21 +215,21 @@ public class SubscriptionStateTest {
     @Test(expected = IllegalStateException.class)
     public void invalidPositionUpdate() {
         state.subscribe(singleton(topic), rebalanceListener);
-        state.assignFromSubscribed(singleton(tp0));
+        assertTrue(state.assignFromSubscribed(singleton(tp0)));
         state.position(tp0, 0);
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void cantAssignPartitionForUnsubscribedTopics() {
         state.subscribe(singleton(topic), rebalanceListener);
-        state.assignFromSubscribed(Collections.singletonList(t1p0));
+        
assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0)));
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
         state.subscribeFromPattern(new 
HashSet<>(Collections.singletonList(topic)));
-        state.assignFromSubscribed(Collections.singletonList(t1p0));
+        
assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0)));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -286,7 +289,7 @@ public class SubscriptionStateTest {
     public void unsubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, 
topic1)));
-        state.assignFromSubscribed(singleton(tp1));
+        assertTrue(state.assignFromSubscribed(singleton(tp1)));
         assertEquals(singleton(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 

Reply via email to