Repository: kafka
Updated Branches:
  refs/heads/trunk eb59c8124 -> 54767bbba


KAFKA-4033; Revise partition assignment semantics on consumer subscription 
changes (KIP-70)

This PR changes topic subscription semantics so a change in subscription does 
not immediately cause a rebalance.
Instead, the next poll or the next scheduled metadata refresh will update the 
assigned partitions.

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Jason Gustafson

Closes #1726 from vahidhashemian/KAFKA-4033


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54767bbb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54767bbb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54767bbb

Branch: refs/heads/trunk
Commit: 54767bbba5bf18c01f50bb40c339433bfed09627
Parents: eb59c81
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Thu Sep 8 19:56:53 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 8 19:56:53 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  13 +-
 .../consumer/internals/ConsumerCoordinator.java |  36 +-
 .../consumer/internals/SubscriptionState.java   |   9 -
 .../clients/consumer/KafkaConsumerTest.java     | 653 ++++++++++++++-----
 .../internals/SubscriptionStateTest.java        |  63 +-
 5 files changed, 568 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 85d5194..ca8f1f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -552,7 +552,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * A consumer is instantiated by providing a {@link java.util.Properties} 
object as configuration.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param properties The consumer configuration properties
      */
     public KafkaConsumer(Properties properties) {
@@ -706,9 +706,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   Metrics metrics,
                   SubscriptionState subscriptions,
                   Metadata metadata,
-                  boolean autoCommitEnabled,
-                  int autoCommitIntervalMs,
-                  int heartbeatIntervalMs,
                   long retryBackoffMs,
                   long requestTimeoutMs) {
         this.clientId = clientId;
@@ -872,6 +869,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
     public void unsubscribe() {
         acquire();
         try {
+            // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+            // are committed since there will be no following rebalance
+            this.coordinator.maybeAutoCommitOffsetsNow();
+
             log.debug("Unsubscribed all topics or patterns and assigned 
partitions");
             this.subscriptions.unsubscribe();
             this.coordinator.maybeLeaveGroup();
@@ -913,6 +914,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
                     topics.add(topic);
                 }
 
+                // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+                // are committed since there will be no following rebalance
+                this.coordinator.maybeAutoCommitOffsetsNow();
+
                 log.debug("Subscribed to partition(s): {}", 
Utils.join(partitions, ", "));
                 this.subscriptions.assignFromUser(new HashSet<>(partitions));
                 metadata.setTopics(topics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b8df50e..ff0d669 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -514,22 +514,31 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 this.nextAutoCommitDeadline = now + retryBackoffMs;
             } else if (now >= nextAutoCommitDeadline) {
                 this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
-                commitOffsetsAsync(subscriptions.allConsumed(), new 
OffsetCommitCallback() {
-                    @Override
-                    public void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception exception) {
-                        if (exception != null) {
-                            log.warn("Auto offset commit failed for group {}: 
{}", groupId, exception.getMessage());
-                            if (exception instanceof RetriableException)
-                                nextAutoCommitDeadline = 
Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
-                        } else {
-                            log.debug("Completed autocommit of offsets {} for 
group {}", offsets, groupId);
-                        }
-                    }
-                });
+                doAutoCommitOffsetsAsync();
             }
         }
     }
 
+    public void maybeAutoCommitOffsetsNow() {
+        if (autoCommitEnabled && !coordinatorUnknown())
+            doAutoCommitOffsetsAsync();
+    }
+
+    private void doAutoCommitOffsetsAsync() {
+        commitOffsetsAsync(subscriptions.allConsumed(), new 
OffsetCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception exception) {
+                if (exception != null) {
+                    log.warn("Auto offset commit failed for group {}: {}", 
groupId, exception.getMessage());
+                    if (exception instanceof RetriableException)
+                        nextAutoCommitDeadline = Math.min(time.milliseconds() 
+ retryBackoffMs, nextAutoCommitDeadline);
+                } else {
+                    log.debug("Completed autocommit of offsets {} for group 
{}", offsets, groupId);
+                }
+            }
+        });
+    }
+
     private void maybeAutoCommitOffsetsSync() {
         if (autoCommitEnabled) {
             try {
@@ -807,7 +816,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
 
         public void invoke() {
-            callback.onComplete(offsets, exception);
+            if (callback != null)
+                callback.onComplete(offsets, exception);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6d4c01b..dd0bce9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -128,13 +127,6 @@ public class SubscriptionState {
         if (!this.subscription.equals(topicsToSubscribe)) {
             this.subscription = topicsToSubscribe;
             this.groupSubscription.addAll(topicsToSubscribe);
-
-            // Remove any assigned partitions which are no longer subscribed to
-            for (Iterator<TopicPartition> it = assignment.keySet().iterator(); 
it.hasNext(); ) {
-                TopicPartition tp = it.next();
-                if (!subscription.contains(tp.topic()))
-                    it.remove();
-            }
         }
     }
 
@@ -216,7 +208,6 @@ public class SubscriptionState {
         this.subscriptionType = SubscriptionType.NONE;
     }
 
-
     public Pattern getSubscribedPattern() {
         return this.subscribedPattern;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8d2ac00..dbe3d67 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,11 +35,13 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchResponse.PartitionData;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
@@ -76,9 +78,15 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class KafkaConsumerTest {
-
     private final String topic = "test";
-    private final TopicPartition tp0 = new TopicPartition("test", 0);
+    private final TopicPartition tp0 = new TopicPartition(topic, 0);
+    private final TopicPartition tp1 = new TopicPartition(topic, 1);
+
+    private final String topic2 = "test2";
+    private final TopicPartition t2p0 = new TopicPartition(topic2, 0);
+
+    private final String topic3 = "test3";
+    private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
 
     @Test
     public void testConstructorClose() throws Exception {
@@ -318,9 +326,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyHeartbeatSent() throws Exception {
-        String topic = "topic";
-        TopicPartition partition = new TopicPartition(topic, 0);
-
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 1000;
@@ -336,47 +341,18 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
 
-        consumer.subscribe(Arrays.asList(topic), new 
ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
-
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-                // set initial position so we don't need a lookup
-                for (TopicPartition partition : partitions)
-                    consumer.seek(partition, 0);
-            }
-        });
-
-        // lookup coordinator
-        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
-
-        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
-
-        // join group
-        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, 
"memberId", "leaderId", Errors.NONE.code()), coordinator);
-
-        // sync group
-        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), 
Errors.NONE.code()), coordinator);
+        consumer.subscribe(Arrays.asList(topic), 
getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0), null);
 
         // initial fetch
-        client.prepareResponseFrom(fetchResponse(partition, 0, 0), node);
+        client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);
 
         consumer.poll(0);
-        assertEquals(Collections.singleton(partition), consumer.assignment());
+        assertEquals(Collections.singleton(tp0), consumer.assignment());
 
-        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
-        client.prepareResponseFrom(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                heartbeatReceived.set(true);
-                return true;
-            }
-        }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, 
coordinator);
 
         // heartbeat interval is 2 seconds
         time.sleep(heartbeatIntervalMs);
@@ -389,9 +365,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
-        String topic = "topic";
-        TopicPartition partition = new TopicPartition(topic, 0);
-
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 1000;
@@ -407,47 +380,19 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
autoCommitIntervalMs);
-        consumer.subscribe(Arrays.asList(topic), new 
ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
 
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-                // set initial position so we don't need a lookup
-                for (TopicPartition partition : partitions)
-                    consumer.seek(partition, 0);
-            }
-        });
-
-        // lookup coordinator
-        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
-
-        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
-
-        // join group
-        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, 
"memberId", "leaderId", Errors.NONE.code()), coordinator);
-
-        // sync group
-        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), 
Errors.NONE.code()), coordinator);
+        consumer.subscribe(Arrays.asList(topic), 
getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0), null);
 
         consumer.poll(0);
 
         // respond to the outstanding fetch so that we have data available on 
the next poll
-        client.respondFrom(fetchResponse(partition, 0, 5), node);
+        client.respondFrom(fetchResponse(tp0, 0, 5), node);
         client.poll(0, time.milliseconds());
 
-        client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
-        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
-        client.prepareResponseFrom(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                heartbeatReceived.set(true);
-                return true;
-            }
-        }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+        client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
+        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, 
coordinator);
 
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
@@ -459,8 +404,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
-        String topic = "topic";
-        final TopicPartition partition = new TopicPartition(topic, 0);
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 3000;
         int heartbeatIntervalMs = 2000;
@@ -476,26 +419,22 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
autoCommitIntervalMs);
-        consumer.assign(Arrays.asList(partition));
-        consumer.seekToBeginning(Arrays.asList(partition));
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+        consumer.assign(Arrays.asList(tp0));
+        consumer.seekToBeginning(Arrays.asList(tp0));
 
         // there shouldn't be any need to lookup the coordinator or fetch 
committed offsets.
         // we just lookup the starting position and send the record fetch.
-        
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 
50L), Errors.NONE.code()));
-        client.prepareResponse(fetchResponse(partition, 50L, 5));
+        
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), 
Errors.NONE.code()));
+        client.prepareResponse(fetchResponse(tp0, 50L, 5));
 
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(5, records.count());
-        assertEquals(55L, consumer.position(partition));
+        assertEquals(55L, consumer.position(tp0));
     }
 
     @Test
     public void testCommitsFetchedDuringAssign() {
-        String topic = "topic";
-        final TopicPartition partition1 = new TopicPartition(topic, 0);
-        final TopicPartition partition2 = new TopicPartition(topic, 1);
-
         long offset1 = 10000;
         long offset2 = 20000;
 
@@ -514,8 +453,8 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
autoCommitIntervalMs);
-        consumer.assign(Arrays.asList(partition1));
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+        consumer.assign(Arrays.asList(tp0));
 
         // lookup coordinator
         client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
@@ -523,28 +462,25 @@ public class KafkaConsumerTest {
 
         // fetch offset for one topic
         client.prepareResponseFrom(
-                offsetResponse(Collections.singletonMap(partition1, offset1), 
Errors.NONE.code()),
+                offsetResponse(Collections.singletonMap(tp0, offset1), 
Errors.NONE.code()),
                 coordinator);
 
-        assertEquals(offset1, consumer.committed(partition1).offset());
+        assertEquals(offset1, consumer.committed(tp0).offset());
 
-        consumer.assign(Arrays.asList(partition1, partition2));
+        consumer.assign(Arrays.asList(tp0, tp1));
 
         // fetch offset for two topics
         Map<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(partition1, offset1);
-        offsets.put(partition2, offset2);
+        offsets.put(tp0, offset1);
+        offsets.put(tp1, offset2);
         client.prepareResponseFrom(offsetResponse(offsets, 
Errors.NONE.code()), coordinator);
 
-        assertEquals(offset1, consumer.committed(partition1).offset());
-        assertEquals(offset2, consumer.committed(partition2).offset());
+        assertEquals(offset1, consumer.committed(tp0).offset());
+        assertEquals(offset2, consumer.committed(tp1).offset());
     }
 
     @Test
     public void testAutoCommitSentBeforePositionUpdate() {
-        String topic = "topic";
-        final TopicPartition partition = new TopicPartition(topic, 0);
-
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 3000;
@@ -563,56 +499,23 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
autoCommitIntervalMs);
-        consumer.subscribe(Arrays.asList(topic), new 
ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
-
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-                // set initial position so we don't need a lookup
-                for (TopicPartition partition : partitions)
-                    consumer.seek(partition, 0);
-            }
-        });
-
-        // lookup coordinator
-        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
-
-        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
-
-        // join group
-        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, 
"memberId", "leaderId", Errors.NONE.code()), coordinator);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
 
-        // sync group
-        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), 
Errors.NONE.code()), coordinator);
+        consumer.subscribe(Arrays.asList(topic), 
getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0), null);
 
         consumer.poll(0);
 
         // respond to the outstanding fetch so that we have data available on 
the next poll
-        client.respondFrom(fetchResponse(partition, 0, 5), node);
+        client.respondFrom(fetchResponse(tp0, 0, 5), node);
         client.poll(0, time.milliseconds());
 
         time.sleep(autoCommitIntervalMs);
 
-        client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
+        client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
 
         // no data has been returned to the user yet, so the committed offset 
should be 0
-        final AtomicBoolean commitReceived = new AtomicBoolean(false);
-        client.prepareResponseFrom(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                OffsetCommitRequest commitRequest = new 
OffsetCommitRequest(request.request().body());
-                OffsetCommitRequest.PartitionData partitionData = 
commitRequest.offsetData().get(partition);
-                if (partitionData.offset == 0) {
-                    commitReceived.set(true);
-                    return true;
-                }
-                return false;
-            }
-        }, new OffsetCommitResponse(Collections.singletonMap(partition, 
Errors.NONE.code())).toStruct(), coordinator);
+        AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, 
coordinator, tp0, 0);
 
         consumer.poll(0);
 
@@ -621,9 +524,6 @@ public class KafkaConsumerTest {
 
     @Test
     public void testWakeupWithFetchDataAvailable() {
-        String topic = "topic";
-        final TopicPartition partition = new TopicPartition(topic, 0);
-
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 3000;
@@ -642,8 +542,360 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
-                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
autoCommitIntervalMs);
-        consumer.subscribe(Arrays.asList(topic), new 
ConsumerRebalanceListener() {
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+
+        consumer.subscribe(Arrays.asList(topic), 
getConsumerRebalanceListener(consumer));
+        prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+        consumer.poll(0);
+
+        // respond to the outstanding fetch so that we have data available on 
the next poll
+        client.respondFrom(fetchResponse(tp0, 0, 5), node);
+        client.poll(0, time.milliseconds());
+
+        consumer.wakeup();
+
+        try {
+            consumer.poll(0);
+            fail();
+        } catch (WakeupException e) {
+        }
+
+        // make sure the position hasn't been updated
+        assertEquals(0, consumer.position(tp0));
+
+        // the next poll should return the completed fetch
+        ConsumerRecords<String, String> records = consumer.poll(0);
+        assertEquals(5, records.count());
+    }
+
+    /**
+     * Verify that when a consumer changes its topic subscription its assigned 
partitions
+     * do not immediately change, and the latest consumed offsets of its 
to-be-revoked
+     * partitions are properly committed (when auto-commit is enabled).
+     * Upon unsubscribing from subscribed topics the consumer subscription and 
assignment
+     * are both updated right away and its consumed offsets are committed (if 
auto-commit
+     * is enabled).
+     */
+    @Test
+    public void testSubscriptionChangesWithAutoCommitEnabled() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+
+        // adjust auto commit interval lower than heartbeat so we don't need 
to deal with
+        // a concurrent heartbeat request
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        tpCounts.put(topic2, 1);
+        tpCounts.put(topic3, 1);
+        Cluster cluster = TestUtils.singletonCluster(tpCounts);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+
+        // initial subscription
+        consumer.subscribe(Arrays.asList(topic, topic2), 
getConsumerRebalanceListener(consumer));
+
+        // verify that subscription has changed but assignment is still 
unchanged
+        assertTrue(consumer.subscription().size() == 2);
+        assertTrue(consumer.subscription().contains(topic) && 
consumer.subscription().contains(topic2));
+        assertTrue(consumer.assignment().isEmpty());
+
+        // mock rebalance responses
+        Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0, t2p0), null);
+
+        consumer.poll(0);
+
+        // verify that subscription is still the same, and now assignment has 
caught up
+        assertTrue(consumer.subscription().size() == 2);
+        assertTrue(consumer.subscription().contains(topic) && 
consumer.subscription().contains(topic2));
+        assertTrue(consumer.assignment().size() == 2);
+        assertTrue(consumer.assignment().contains(tp0) && 
consumer.assignment().contains(t2p0));
+
+        // mock a response to the outstanding fetch so that we have data 
available on the next poll
+        Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>();
+        fetches1.put(tp0, new FetchInfo(0, 1));
+        fetches1.put(t2p0, new FetchInfo(0, 10));
+        client.respondFrom(fetchResponse(fetches1), node);
+        client.poll(0, time.milliseconds());
+
+        ConsumerRecords<String, String> records = consumer.poll(0);
+
+        // verify that the fetch occurred as expected
+        assertEquals(11, records.count());
+        assertEquals(1L, consumer.position(tp0));
+        assertEquals(10L, consumer.position(t2p0));
+
+
+        // subscription change
+        consumer.subscribe(Arrays.asList(topic, topic3), 
getConsumerRebalanceListener(consumer));
+
+        // verify that subscription has changed but assignment is still 
unchanged
+        assertTrue(consumer.subscription().size() == 2);
+        assertTrue(consumer.subscription().contains(topic) && 
consumer.subscription().contains(topic3));
+        assertTrue(consumer.assignment().size() == 2);
+        assertTrue(consumer.assignment().contains(tp0) && 
consumer.assignment().contains(t2p0));
+
+        // mock the offset commit response for to be revoked partitions
+        Map<TopicPartition, Long> partitionOffsets1 = new HashMap<>();
+        partitionOffsets1.put(tp0, 1L);
+        partitionOffsets1.put(t2p0, 10L);
+        AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, 
coordinator, partitionOffsets1);
+
+        // mock rebalance responses
+        prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), 
coordinator);
+
+        // mock a response to the outstanding fetch so that we have data 
available on the next poll
+        Map<TopicPartition, FetchInfo> fetches2 = new HashMap<>();
+        fetches2.put(tp0, new FetchInfo(1, 1));
+        fetches2.put(t3p0, new FetchInfo(0, 100));
+        client.respondFrom(fetchResponse(fetches2), node);
+        client.poll(0, time.milliseconds());
+        client.prepareResponse(fetchResponse(fetches2));
+
+        records = consumer.poll(0);
+
+        // verify that the fetch occurred as expected
+        assertEquals(101, records.count());
+        assertEquals(2L, consumer.position(tp0));
+        assertEquals(100L, consumer.position(t3p0));
+
+        // verify that the offset commits occurred as expected
+        assertTrue(commitReceived.get());
+
+        // verify that subscription is still the same, and now assignment has 
caught up
+        assertTrue(consumer.subscription().size() == 2);
+        assertTrue(consumer.subscription().contains(topic) && 
consumer.subscription().contains(topic3));
+        assertTrue(consumer.assignment().size() == 2);
+        assertTrue(consumer.assignment().contains(tp0) && 
consumer.assignment().contains(t3p0));
+
+
+        // mock the offset commit response for to be revoked partitions
+        Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>();
+        partitionOffsets2.put(tp0, 2L);
+        partitionOffsets2.put(t3p0, 100L);
+        commitReceived = prepareOffsetCommitResponse(client, coordinator, 
partitionOffsets2);
+
+        // unsubscribe
+        consumer.unsubscribe();
+
+        // verify that subscription and assignment are both cleared
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
+
+        // verify that the offset commits occurred as expected
+        assertTrue(commitReceived.get());
+
+        consumer.close();
+    }
+
+    /**
+     * Verify that when a consumer changes its topic subscription its assigned 
partitions
+     * do not immediately change, and the consumed offsets of its 
to-be-revoked partitions
+     * are not committed (when auto-commit is disabled).
+     * Upon unsubscribing from subscribed topics, the assigned partitions 
immediately
+     * change but if auto-commit is disabled the consumer offsets are not 
committed.
+     */
+    @Test
+    public void testSubscriptionChangesWithAutoCommitDisabled() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        tpCounts.put(topic2, 1);
+        Cluster cluster = TestUtils.singletonCluster(tpCounts);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
false, autoCommitIntervalMs);
+
+        // initial subscription
+        consumer.subscribe(Arrays.asList(topic), 
getConsumerRebalanceListener(consumer));
+
+        // verify that subscription has changed but assignment is still 
unchanged
+        
assertTrue(consumer.subscription().equals(Collections.singleton(topic)));
+        assertTrue(consumer.assignment().isEmpty());
+
+        // mock rebalance responses
+        prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+        consumer.poll(0);
+
+        // verify that subscription is still the same, and now assignment has 
caught up
+        
assertTrue(consumer.subscription().equals(Collections.singleton(topic)));
+        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+        consumer.poll(0);
+
+        // subscription change
+        consumer.subscribe(Arrays.asList(topic2), 
getConsumerRebalanceListener(consumer));
+
+        // verify that subscription has changed but assignment is still 
unchanged
+        
assertTrue(consumer.subscription().equals(Collections.singleton(topic2)));
+        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+        // the auto commit is disabled, so no offset commit request should be 
sent
+        for (ClientRequest req: client.requests())
+            assertTrue(req.request().header().apiKey() != 
ApiKeys.OFFSET_COMMIT.id);
+
+        // subscription change
+        consumer.unsubscribe();
+
+        // verify that subscription and assignment are both updated
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
+
+        // the auto commit is disabled, so no offset commit request should be 
sent
+        for (ClientRequest req: client.requests())
+            assertTrue(req.request().header().apiKey() != 
ApiKeys.OFFSET_COMMIT.id);
+
+        consumer.close();
+    }
+
+    @Test
+    public void testManualAssignmentChangeWithAutoCommitEnabled() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        tpCounts.put(topic2, 1);
+        Cluster cluster = TestUtils.singletonCluster(tpCounts);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+
+        // lookup coordinator
+        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+
+        // manual assignment
+        consumer.assign(Arrays.asList(tp0));
+        consumer.seekToBeginning(Arrays.asList(tp0));
+
+        // fetch offset for one topic
+        client.prepareResponseFrom(
+                offsetResponse(Collections.singletonMap(tp0, 0L), 
Errors.NONE.code()),
+                coordinator);
+        assertEquals(0, consumer.committed(tp0).offset());
+
+        // verify that assignment immediately changes
+        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+        // there shouldn't be any need to lookup the coordinator or fetch 
committed offsets.
+        // we just lookup the starting position and send the record fetch.
+        
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), 
Errors.NONE.code()));
+        client.prepareResponse(fetchResponse(tp0, 10L, 1));
+
+        ConsumerRecords<String, String> records = consumer.poll(0);
+        assertEquals(1, records.count());
+        assertEquals(11L, consumer.position(tp0));
+
+        // mock the offset commit response for to be revoked partitions
+        AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, 
coordinator, tp0, 11);
+
+        // new manual assignment
+        consumer.assign(Arrays.asList(t2p0));
+
+        // verify that assignment immediately changes
+        assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
+        // verify that the offset commits occurred as expected
+        assertTrue(commitReceived.get());
+
+        consumer.close();
+    }
+
+    @Test
+    public void testManualAssignmentChangeWithAutoCommitDisabled() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        tpCounts.put(topic2, 1);
+        Cluster cluster = TestUtils.singletonCluster(tpCounts);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
false, autoCommitIntervalMs);
+
+        // lookup coordinator
+        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+
+        // manual assignment
+        consumer.assign(Arrays.asList(tp0));
+        consumer.seekToBeginning(Arrays.asList(tp0));
+
+        // fetch offset for one topic
+        client.prepareResponseFrom(
+                offsetResponse(Collections.singletonMap(tp0, 0L), 
Errors.NONE.code()),
+                coordinator);
+        assertEquals(0, consumer.committed(tp0).offset());
+
+        // verify that assignment immediately changes
+        assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+        // there shouldn't be any need to lookup the coordinator or fetch 
committed offsets.
+        // we just lookup the starting position and send the record fetch.
+        
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), 
Errors.NONE.code()));
+        client.prepareResponse(fetchResponse(tp0, 10L, 1));
+
+        ConsumerRecords<String, String> records = consumer.poll(0);
+        assertEquals(1, records.count());
+        assertEquals(11L, consumer.position(tp0));
+
+        // new manual assignment
+        consumer.assign(Arrays.asList(t2p0));
+
+        // verify that assignment immediately changes
+        assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
+
+        // the auto commit is disabled, so no offset commit request should be 
sent
+        for (ClientRequest req: client.requests())
+            assertTrue(req.request().header().apiKey() != 
ApiKeys.OFFSET_COMMIT.id);
+
+        consumer.close();
+    }
+
+    private ConsumerRebalanceListener getConsumerRebalanceListener(final 
KafkaConsumer<String, String> consumer) {
+        return new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
 
@@ -655,39 +907,68 @@ public class KafkaConsumerTest {
                 for (TopicPartition partition : partitions)
                     consumer.seek(partition, 0);
             }
-        });
-
-        // lookup coordinator
-        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        };
+    }
 
-        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+    private Node prepareRebalance(MockClient client, Node node, 
PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+        if (coordinator == null) {
+            // lookup coordinator
+            client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+            coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), 
node.port());
+        }
 
         // join group
         client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, 
"memberId", "leaderId", Errors.NONE.code()), coordinator);
 
         // sync group
-        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), 
Errors.NONE.code()), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(partitions, 
Errors.NONE.code()), coordinator);
 
-        consumer.poll(0);
+        return coordinator;
+    }
 
-        // respond to the outstanding fetch so that we have data available on 
the next poll
-        client.respondFrom(fetchResponse(partition, 0, 5), node);
-        client.poll(0, time.milliseconds());
+    private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node 
coordinator) {
+        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                heartbeatReceived.set(true);
+                return true;
+            }
+        }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+        return heartbeatReceived;
+    }
 
-        consumer.wakeup();
+    private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node 
coordinator, final Map<TopicPartition, Long> partitionOffsets) {
+        final AtomicBoolean commitReceived = new AtomicBoolean(true);
+        Map<TopicPartition, Short> response = new HashMap<>();
+        for (TopicPartition partition : partitionOffsets.keySet())
+            response.put(partition, Errors.NONE.code());
 
-        try {
-            consumer.poll(0);
-            fail();
-        } catch (WakeupException e) {
-        }
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                OffsetCommitRequest commitRequest = new 
OffsetCommitRequest(request.request().body());
+                for (Map.Entry<TopicPartition, Long> partitionOffset : 
partitionOffsets.entrySet()) {
+                    OffsetCommitRequest.PartitionData partitionData = 
commitRequest.offsetData().get(partitionOffset.getKey());
+                    // verify that the expected offset has been committed
+                    if (partitionData.offset != partitionOffset.getValue()) {
+                        commitReceived.set(false);
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }, offsetCommitResponse(response), coordinator);
+        return commitReceived;
+    }
 
-        // make sure the position hasn't been updated
-        assertEquals(0, consumer.position(partition));
+    private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node 
coordinator, final TopicPartition partition, final long offset) {
+        return prepareOffsetCommitResponse(client, coordinator, 
Collections.singletonMap(partition, offset));
+    }
 
-        // the next poll should return the completed fetch
-        ConsumerRecords<String, String> records = consumer.poll(0);
-        assertEquals(5, records.count());
+    private Struct offsetCommitResponse(Map<TopicPartition, Short> 
responseData) {
+        OffsetCommitResponse response = new OffsetCommitResponse(responseData);
+        return response.toStruct();
     }
 
     private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int 
generationId, String memberId, String leaderId, short error) {
@@ -717,16 +998,27 @@ public class KafkaConsumerTest {
         return new ListOffsetResponse(partitionData).toStruct();
     }
 
-    private Struct fetchResponse(TopicPartition tp, long fetchOffset, int 
count) {
-        MemoryRecords records = 
MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
-        for (int i = 0; i < count; i++)
-            records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), 
("value-" + i).getBytes());
-        records.close();
-        FetchResponse response = new FetchResponse(Collections.singletonMap(
-                tp, new FetchResponse.PartitionData(Errors.NONE.code(), 5, 
records.buffer())), 0);
+    private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
+        Map<TopicPartition, PartitionData> tpResponses = new HashMap<>();
+        for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : 
fetches.entrySet()) {
+            TopicPartition partition = fetchEntry.getKey();
+            long fetchOffset = fetchEntry.getValue().offset;
+            int fetchCount = fetchEntry.getValue().count;
+            MemoryRecords records = 
MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+            for (int i = 0; i < fetchCount; i++)
+                records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), 
("value-" + i).getBytes());
+            records.close();
+            tpResponses.put(partition, new 
FetchResponse.PartitionData(Errors.NONE.code(), 0, records.buffer()));
+        }
+        FetchResponse response = new FetchResponse(tpResponses, 0);
         return response.toStruct();
     }
 
+    private Struct fetchResponse(TopicPartition partition, long fetchOffset, 
int count) {
+        FetchInfo fetchInfo = new FetchInfo(fetchOffset, count);
+        return fetchResponse(Collections.singletonMap(partition, fetchInfo));
+    }
+
     private KafkaConsumer<String, String> newConsumer(Time time,
                                                       KafkaClient client,
                                                       Metadata metadata,
@@ -734,6 +1026,7 @@ public class KafkaConsumerTest {
                                                       int rebalanceTimeoutMs,
                                                       int sessionTimeoutMs,
                                                       int heartbeatIntervalMs,
+                                                      boolean 
autoCommitEnabled,
                                                       int 
autoCommitIntervalMs) {
         // create a consumer with mocked time and mocked network
 
@@ -742,7 +1035,6 @@ public class KafkaConsumerTest {
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         long requestTimeoutMs = 30000;
-        boolean autoCommitEnabled = true;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
         int maxWaitMs = 500;
@@ -808,10 +1100,17 @@ public class KafkaConsumerTest {
                 metrics,
                 subscriptions,
                 metadata,
-                autoCommitEnabled,
-                autoCommitIntervalMs,
-                heartbeatIntervalMs,
                 retryBackoffMs,
                 requestTimeoutMs);
     }
+
+    private static class FetchInfo {
+        long offset;
+        int count;
+
+        FetchInfo(long offset, int count) {
+            this.offset = offset;
+            this.count = count;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 783f0e6..a950cad 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -39,8 +39,9 @@ public class SubscriptionStateTest {
     private final SubscriptionState state = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
     private final String topic = "test";
     private final String topic1 = "test1";
-    private final TopicPartition tp0 = new TopicPartition("test", 0);
-    private final TopicPartition tp1 = new TopicPartition("test", 1);
+    private final TopicPartition tp0 = new TopicPartition(topic, 0);
+    private final TopicPartition tp1 = new TopicPartition(topic, 1);
+    private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
     private final MockRebalanceListener rebalanceListener = new 
MockRebalanceListener();
 
     @Test
@@ -60,6 +61,62 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void partitionAssignmentChangeOnTopicSubscription() {
+        state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        // assigned partitions should immediately change
+        assertEquals(2, state.assignedPartitions().size());
+        assertTrue(state.assignedPartitions().contains(tp0));
+        assertTrue(state.assignedPartitions().contains(tp1));
+
+        state.unsubscribe();
+        // assigned partitions should immediately change
+        assertTrue(state.assignedPartitions().isEmpty());
+
+        state.subscribe(singleton(topic1), rebalanceListener);
+        // assigned partitions should remain unchanged
+        assertTrue(state.assignedPartitions().isEmpty());
+
+        state.assignFromSubscribed(asList(t1p0));
+        // assigned partitions should immediately change
+        assertEquals(singleton(t1p0), state.assignedPartitions());
+
+        state.subscribe(singleton(topic), rebalanceListener);
+        // assigned partitions should remain unchanged
+        assertEquals(singleton(t1p0), state.assignedPartitions());
+
+        state.unsubscribe();
+        // assigned partitions should immediately change
+        assertTrue(state.assignedPartitions().isEmpty());
+    }
+
+    @Test
+    public void partitionAssignmentChangeOnPatternSubscription() {
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
+        // assigned partitions should remain unchanged
+        assertTrue(state.assignedPartitions().isEmpty());
+
+        state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, 
topic1)));
+        // assigned partitions should remain unchanged
+        assertTrue(state.assignedPartitions().isEmpty());
+
+        state.assignFromSubscribed(asList(tp1));
+        // assigned partitions should immediately change
+        assertEquals(singleton(tp1), state.assignedPartitions());
+
+        state.subscribe(Pattern.compile(".*t"), rebalanceListener);
+        // assigned partitions should remain unchanged
+        assertEquals(singleton(tp1), state.assignedPartitions());
+
+        state.subscribeFromPattern(singleton(topic));
+        // assigned partitions should remain unchanged
+        assertEquals(singleton(tp1), state.assignedPartitions());
+
+        state.unsubscribe();
+        // assigned partitions should immediately change
+        assertTrue(state.assignedPartitions().isEmpty());
+    }
+
+    @Test
     public void partitionReset() {
         state.assignFromUser(singleton(tp0));
         state.seek(tp0, 5);
@@ -217,5 +274,5 @@ public class SubscriptionStateTest {
         }
 
     }
-    
+
 }

Reply via email to