This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69d86a1  KAFKA-8179: add public ConsumerPartitionAssignor interface 
(#7108)
69d86a1 is described below

commit 69d86a197f86ad4c6f1636b5ab4678907e30a4c0
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jul 25 13:02:09 2019 -0700

    KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)
    
    Main changes of this PR:
    
    * Deprecate old consumer.internal.PartitionAssignor and add public 
consumer.ConsumerPartitionAssignor with all OOTB assignors migrated to new 
interface
    * Refactor assignor's assignment/subscription related classes for easier to 
evolve API
    * Removed version number from classes as it is only needed for 
serialization/deserialization
    * Other previously-discussed cleanup included in this PR:
    
    * Remove Assignment.error added in pt 1
    * Remove ConsumerCoordinator#adjustAssignment added in pt 2
    
    Reviewers: Boyang Chen <[email protected]>, Jason Gustafson 
<[email protected]>, Guozhang Wang <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../clients/consumer/ConsumerGroupMetadata.java    |  50 +++++
 ...ssignor.java => ConsumerPartitionAssignor.java} | 210 ++++++++-------------
 .../kafka/clients/consumer/KafkaConsumer.java      |   7 +-
 .../kafka/clients/consumer/StickyAssignor.java     |  11 +-
 .../internals/AbstractPartitionAssignor.java       |  18 +-
 .../consumer/internals/ConsumerCoordinator.java    |  86 +++------
 .../consumer/internals/ConsumerProtocol.java       |  95 ++++------
 .../consumer/internals/PartitionAssignor.java      | 142 +-------------
 .../consumer/internals/SubscriptionState.java      |   8 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   6 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  69 ++++---
 .../kafka/clients/consumer/RangeAssignorTest.java  |   2 +-
 .../clients/consumer/RoundRobinAssignorTest.java   |   2 +-
 .../kafka/clients/consumer/StickyAssignorTest.java |   2 +-
 .../internals/ConsumerCoordinatorTest.java         |  31 +--
 .../consumer/internals/ConsumerProtocolTest.java   |  45 +----
 .../group/GroupMetadataManagerTest.scala           |   2 +-
 .../internals/StreamsPartitionAssignor.java        |  20 +-
 .../internals/StreamsPartitionAssignorTest.java    | 126 +++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  21 ++-
 22 files changed, 363 insertions(+), 596 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 349fc2a..f2ee21e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,9 +29,9 @@ import 
org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.ElectionType;
@@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
                     for (DescribedGroupMember groupMember : members) {
                         Set<TopicPartition> partitions = 
Collections.emptySet();
                         if (groupMember.memberAssignment().length > 0) {
-                            final PartitionAssignor.Assignment assignment = 
ConsumerProtocol.
+                            final ConsumerPartitionAssignor.Assignment 
assignment = ConsumerProtocol.
                                 
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
                             partitions = new 
HashSet<>(assignment.partitions());
                         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7eb34d4..8a18bd5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>partition.assignment.strategy</code>
      */
     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = 
"partition.assignment.strategy";
-    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class 
name of the partition assignment strategy that the client will use to 
distribute partition ownership amongst consumer instances when group management 
is used";
+    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class 
name or class type of the assignor implementing the partition assignment 
strategy that the client will use to distribute partition ownership amongst 
consumer instances when group management is used. A custom assignor that 
implements ConsumerPartitionAssignor can be plugged in";
 
     /**
      * <code>auto.offset.reset</code>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
new file mode 100644
index 0000000..e17894b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Optional;
+
+public class ConsumerGroupMetadata {
+    private String groupId;
+    private int generationId;
+    private String memberId;
+    Optional<String> groupInstanceId;
+
+    public ConsumerGroupMetadata(String groupId, int generationId, String 
memberId, Optional<String> groupInstanceId) {
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
similarity index 52%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index c26f684..72d5d6e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -14,69 +14,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.SchemaException;
+package org.apache.kafka.clients.consumer;
 
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * This interface is used to define custom partition assignment for use in
  * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the 
consumer group subscribe
  * to the topics they are interested in and forward their subscriptions to a 
Kafka broker serving
  * as the group coordinator. The coordinator selects one member to perform the 
group assignment and
- * propagates the subscriptions of all members to it. Then {@link 
#assign(Cluster, Map)} is called
+ * propagates the subscriptions of all members to it. Then {@link 
#assign(Cluster, GroupSubscription)} is called
  * to perform the assignment and the results are forwarded back to each 
respective members
  *
  * In some cases, it is useful to forward additional metadata to the assignor 
in order to make
- * assignment decisions. For this, you can override {@link #subscription(Set)} 
and provide custom
+ * assignment decisions. For this, you can override {@link 
#subscriptionUserData(Set)} and provide custom
  * userData in the returned Subscription. For example, to have a rack-aware 
assignor, an implementation
  * can use this user data to forward the rackId belonging to each member.
  */
-public interface PartitionAssignor {
+public interface ConsumerPartitionAssignor {
 
     /**
-     * Return a serializable object representing the local member's 
subscription. This can include
-     * additional information as well (e.g. local host/rack information) which 
can be leveraged in
-     * {@link #assign(Cluster, Map)}.
-     * @param topics Topics subscribed to through {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
-     *               and variants
-     * @return Non-null subscription with optional user data
+     * Return serialized data that will be included in the {@link 
Subscription} sent to the leader
+     * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} 
((e.g. local host/rack information)
+     *
+     * @return optional join subscription user data
      */
-    Subscription subscription(Set<String> topics);
+    default ByteBuffer subscriptionUserData(Set<String> topics) {
+        return null;
+    }
 
     /**
      * Perform the group assignment given the member subscriptions and current 
cluster metadata.
      * @param metadata Current topic/broker metadata known by consumer
-     * @param subscriptions Subscriptions from all members provided through 
{@link #subscription(Set)}
+     * @param subscriptions Subscriptions from all members including metadata 
provided through {@link #subscriptionUserData(Set)}
      * @return A map from the members to their respective assignment. This 
should have one entry
-     *         for all members who in the input subscription map.
-     */
-    Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> 
subscriptions);
-
-    /**
-     * Callback which is invoked when a group member receives its assignment 
from the leader.
-     * @param assignment The local member's assignment as provided by the 
leader in {@link #assign(Cluster, Map)}
+     *         for each member in the input subscription map.
      */
-    void onAssignment(Assignment assignment);
+    GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);
 
     /**
      * Callback which is invoked when a group member receives its assignment 
from the leader.
-     * @param assignment The local member's assignment as provided by the 
leader in {@link #assign(Cluster, Map)}
-     * @param generation The consumer group generation associated with this 
partition assignment (optional)
+     * @param assignment The local member's assignment as provided by the 
leader in {@link #assign(Cluster, GroupSubscription)}
+     * @param metadata Additional metadata on the consumer (optional)
      */
-    default void onAssignment(Assignment assignment, int generation) {
-        onAssignment(assignment);
+    default void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
     }
 
     /**
@@ -96,92 +84,45 @@ public interface PartitionAssignor {
     }
 
     /**
-     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
+     * Unique name for this assignor (e.g. "range" or "roundrobin" or 
"sticky"). Note, this is not required
+     * to be the same as the class name specified in {@link 
ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
      * @return non-null unique name
      */
     String name();
 
-    enum RebalanceProtocol {
-        EAGER((byte) 0), COOPERATIVE((byte) 1);
-
-        private final byte id;
-
-        RebalanceProtocol(byte id) {
-            this.id = id;
-        }
-
-        public byte id() {
-            return id;
-        }
-
-        public static RebalanceProtocol forId(byte id) {
-            switch (id) {
-                case 0:
-                    return EAGER;
-                case 1:
-                    return COOPERATIVE;
-                default:
-                    throw new IllegalArgumentException("Unknown rebalance 
protocol id: " + id);
-            }
-        }
-    }
-
-    class Subscription {
-        private final Short version;
+    final class Subscription {
         private final List<String> topics;
         private final ByteBuffer userData;
         private final List<TopicPartition> ownedPartitions;
         private Optional<String> groupInstanceId;
 
-        Subscription(Short version,
-                     List<String> topics,
-                     ByteBuffer userData,
-                     List<TopicPartition> ownedPartitions) {
-            this.version = version;
+        public Subscription(List<String> topics, ByteBuffer userData, 
List<TopicPartition> ownedPartitions) {
             this.topics = topics;
             this.userData = userData;
             this.ownedPartitions = ownedPartitions;
             this.groupInstanceId = Optional.empty();
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " 
+ version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty())
-                throw new IllegalArgumentException("Subscription version 
smaller than 1 should not have owned partitions");
-        }
-
-        Subscription(Short version, List<String> topics, ByteBuffer userData) {
-            this(version, topics, userData, Collections.emptyList());
-        }
-
-        public Subscription(List<String> topics, ByteBuffer userData, 
List<TopicPartition> ownedPartitions) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
         }
 
         public Subscription(List<String> topics, ByteBuffer userData) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData);
+            this(topics, userData, Collections.emptyList());
         }
 
         public Subscription(List<String> topics) {
-            this(topics, ByteBuffer.wrap(new byte[0]));
-        }
-
-        Short version() {
-            return version;
+            this(topics, null, Collections.emptyList());
         }
 
         public List<String> topics() {
             return topics;
         }
 
-        public List<TopicPartition> ownedPartitions() {
-            return ownedPartitions;
-        }
-
         public ByteBuffer userData() {
             return userData;
         }
 
+        public List<TopicPartition> ownedPartitions() {
+            return ownedPartitions;
+        }
+
         public void setGroupInstanceId(Optional<String> groupInstanceId) {
             this.groupInstanceId = groupInstanceId;
         }
@@ -189,79 +130,76 @@ public interface PartitionAssignor {
         public Optional<String> groupInstanceId() {
             return groupInstanceId;
         }
-
-        @Override
-        public String toString() {
-            return "Subscription(" +
-                    "version=" + version +
-                    ", topics=" + topics +
-                    ", ownedPartitions=" + ownedPartitions +
-                    ", group.instance.id=" + groupInstanceId + ")";
-        }
     }
 
-    class Assignment {
-        private final Short version;
+    final class Assignment {
         private List<TopicPartition> partitions;
-        private final ByteBuffer userData;
-        private ConsumerProtocol.AssignmentError error;
+        private ByteBuffer userData;
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer 
userData, ConsumerProtocol.AssignmentError error) {
-            this.version = version;
+        public Assignment(List<TopicPartition> partitions, ByteBuffer 
userData) {
             this.partitions = partitions;
             this.userData = userData;
-            this.error = error;
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " 
+ version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && error != 
ConsumerProtocol.AssignmentError.NONE)
-                throw new IllegalArgumentException("Assignment version smaller 
than 1 should not have error code.");
         }
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer 
userData) {
-            this(version, partitions, userData, 
ConsumerProtocol.AssignmentError.NONE);
+        public Assignment(List<TopicPartition> partitions) {
+            this(partitions, null);
         }
 
-        public Assignment(List<TopicPartition> partitions, ByteBuffer 
userData) {
-            this(CONSUMER_PROTOCOL_V1, partitions, userData);
+        public List<TopicPartition> partitions() {
+            return partitions;
         }
 
-        public Assignment(List<TopicPartition> partitions) {
-            this(partitions, ByteBuffer.wrap(new byte[0]));
+        public ByteBuffer userData() {
+            return userData;
         }
+    }
 
-        Short version() {
-            return version;
+    final class GroupSubscription {
+        private final Map<String, Subscription> subscriptions;
+
+        public GroupSubscription(Map<String, Subscription> subscriptions) {
+            this.subscriptions = subscriptions;
         }
 
-        public List<TopicPartition> partitions() {
-            return partitions;
+        public Map<String, Subscription> groupSubscription() {
+            return subscriptions;
         }
+    }
+
+    final class GroupAssignment {
+        private final Map<String, Assignment> assignments;
 
-        public ConsumerProtocol.AssignmentError error() {
-            return error;
+        public GroupAssignment(Map<String, Assignment> assignments) {
+            this.assignments = assignments;
         }
 
-        public void updatePartitions(List<TopicPartition> partitions) {
-            this.partitions = partitions;
+        public Map<String, Assignment> groupAssignment() {
+            return assignments;
         }
+    }
+
+    enum RebalanceProtocol {
+        EAGER((byte) 0), COOPERATIVE((byte) 1);
+
+        private final byte id;
 
-        public void setError(ConsumerProtocol.AssignmentError error) {
-            this.error = error;
+        RebalanceProtocol(byte id) {
+            this.id = id;
         }
 
-        public ByteBuffer userData() {
-            return userData;
+        public byte id() {
+            return id;
         }
 
-        @Override
-        public String toString() {
-            return "Assignment(" +
-                    "version=" + version +
-                    ", partitions=" + partitions +
-                    ", error=" + error +
-                    ')';
+        public static RebalanceProtocol forId(byte id) {
+            switch (id) {
+                case 0:
+                    return EAGER;
+                case 1:
+                    return COOPERATIVE;
+                default:
+                    throw new IllegalArgumentException("Unknown rebalance 
protocol id: " + id);
+            }
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fa5cc99..30944b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -29,7 +29,6 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -581,7 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final long requestTimeoutMs;
     private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
-    private List<PartitionAssignor> assignors;
+    private List<ConsumerPartitionAssignor> assignors;
 
     // currentThread holds the threadId of the current thread accessing 
KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -768,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     heartbeatIntervalMs); //Will avoid blocking an extended 
period of time to prevent heartbeat thread starvation
             this.assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                    PartitionAssignor.class);
+                    ConsumerPartitionAssignor.class);
 
             // no coordinator will be constructed for the default (null) group 
id
             this.coordinator = groupId == null ? null :
@@ -833,7 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   long retryBackoffMs,
                   long requestTimeoutMs,
                   int defaultApiTimeoutMs,
-                  List<PartitionAssignor> assignors,
+                  List<ConsumerPartitionAssignor> assignors,
                   String groupId) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 3c7d010..3311cd8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -365,18 +365,17 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
     }
 
     @Override
-    public void onAssignment(Assignment assignment, int generation) {
+    public void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
         memberAssignment = assignment.partitions();
-        this.generation = generation;
+        this.generation = metadata.generationId();
     }
 
     @Override
-    public Subscription subscription(Set<String> topics) {
+    public ByteBuffer subscriptionUserData(Set<String> topics) {
         if (memberAssignment == null)
-            return new Subscription(new ArrayList<>(topics));
+            return null;
 
-        return new Subscription(new ArrayList<>(topics),
-                serializeTopicPartitionAssignment(new 
ConsumerUserData(memberAssignment, Optional.of(generation))));
+        return serializeTopicPartitionAssignment(new 
ConsumerUserData(memberAssignment, Optional.of(generation)));
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 2487daa..3b966b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -33,7 +34,7 @@ import java.util.Set;
  * Abstract assignor implementation which does some common grunt work (in 
particular collecting
  * partition counts which are always needed in assignors).
  */
-public abstract class AbstractPartitionAssignor implements PartitionAssignor {
+public abstract class AbstractPartitionAssignor implements 
ConsumerPartitionAssignor {
     private static final Logger log = 
LoggerFactory.getLogger(AbstractPartitionAssignor.class);
 
     /**
@@ -47,12 +48,8 @@ public abstract class AbstractPartitionAssignor implements 
PartitionAssignor {
                                                              Map<String, 
Subscription> subscriptions);
 
     @Override
-    public Subscription subscription(Set<String> topics) {
-        return new Subscription(new ArrayList<>(topics));
-    }
-
-    @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, 
Subscription> subscriptions) {
+    public GroupAssignment assign(Cluster metadata, GroupSubscription 
groupSubscriptions) {
+        Map<String, Subscription> subscriptions = 
groupSubscriptions.groupSubscription();
         Set<String> allSubscribedTopics = new HashSet<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet())
             allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
@@ -72,12 +69,7 @@ public abstract class AbstractPartitionAssignor implements 
PartitionAssignor {
         Map<String, Assignment> assignments = new HashMap<>();
         for (Map.Entry<String, List<TopicPartition>> assignmentEntry : 
rawAssignments.entrySet())
             assignments.put(assignmentEntry.getKey(), new 
Assignment(assignmentEntry.getValue()));
-        return assignments;
-    }
-
-    @Override
-    public void onAssignment(Assignment assignment) {
-        // this assignor maintains no internal state, so nothing to do
+        return new GroupAssignment(assignments);
     }
 
     protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4866986..a28119d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -18,13 +18,16 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol;
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -78,7 +81,7 @@ import java.util.stream.Collectors;
 public final class ConsumerCoordinator extends AbstractCoordinator {
     private final GroupRebalanceConfig rebalanceConfig;
     private final Logger log;
-    private final List<PartitionAssignor> assignors;
+    private final List<ConsumerPartitionAssignor> assignors;
     private final ConsumerMetadata metadata;
     private final ConsumerCoordinatorMetrics sensors;
     private final SubscriptionState subscriptions;
@@ -128,7 +131,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
                                LogContext logContext,
                                ConsumerNetworkClient client,
-                               List<PartitionAssignor> assignors,
+                               List<ConsumerPartitionAssignor> assignors,
                                ConsumerMetadata metadata,
                                SubscriptionState subscriptions,
                                Metrics metrics,
@@ -170,13 +173,13 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (!assignors.isEmpty()) {
             List<RebalanceProtocol> supportedProtocols = new 
ArrayList<>(assignors.get(0).supportedProtocols());
 
-            for (PartitionAssignor assignor : assignors) {
+            for (ConsumerPartitionAssignor assignor : assignors) {
                 supportedProtocols.retainAll(assignor.supportedProtocols());
             }
 
             if (supportedProtocols.isEmpty()) {
                 throw new IllegalArgumentException("Specified assignors " +
-                    
assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) +
+                    
assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet())
 +
                     " do not have commonly supported rebalance protocol");
             }
 
@@ -201,8 +204,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.joinedSubscription = subscriptions.subscription();
         JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
 
-        for (PartitionAssignor assignor : assignors) {
-            Subscription subscription = 
assignor.subscription(joinedSubscription);
+        for (ConsumerPartitionAssignor assignor : assignors) {
+            Subscription subscription = new Subscription(new 
ArrayList<>(joinedSubscription),
+                                                         
assignor.subscriptionUserData(joinedSubscription),
+                                                         
subscriptions.assignedPartitionsList());
             ByteBuffer metadata = 
ConsumerProtocol.serializeSubscription(subscription);
 
             protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
@@ -220,8 +225,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             metadata.requestUpdateForNewTopics();
     }
 
-    private PartitionAssignor lookupAssignor(String name) {
-        for (PartitionAssignor assignor : this.assignors) {
+    private ConsumerPartitionAssignor lookupAssignor(String name) {
+        for (ConsumerPartitionAssignor assignor : this.assignors) {
             if (assignor.name().equals(name))
                 return assignor;
         }
@@ -261,7 +266,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (!isLeader)
             assignmentSnapshot = null;
 
-        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
+        ConsumerPartitionAssignor assignor = 
lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid 
assignment protocol: " + assignmentStrategy);
 
@@ -285,7 +290,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         maybeUpdateJoinedSubscription(assignedPartitions);
 
         // give the assignor a chance to update internal state based on the 
received assignment
-        assignor.onAssignment(assignment, generation);
+        ConsumerGroupMetadata metadata = new 
ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, 
rebalanceConfig.groupInstanceId);
+        assignor.onAssignment(assignment, metadata);
 
         // reschedule the auto commit starting from now
         if (autoCommitEnabled)
@@ -314,10 +320,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             case COOPERATIVE:
                 assignAndRevoke(listener, assignedPartitions, ownedPartitions);
 
-                if (assignment.error() == 
ConsumerProtocol.AssignmentError.NEED_REJOIN) {
-                    requestRejoin();
-                }
-
                 break;
         }
 
@@ -470,20 +472,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String 
assignmentStrategy,
                                                         
List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
-        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
+        ConsumerPartitionAssignor assignor = 
lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid 
assignment protocol: " + assignmentStrategy);
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
-        // collect all the owned partitions
-        Map<TopicPartition, String> ownedPartitions = new HashMap<>();
         for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription 
: allSubscriptions) {
             Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
             
subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
             subscriptions.put(memberSubscription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
-            
ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item
 -> item, item -> memberSubscription.memberId())));
         }
 
         // the leader will begin watching for changes to any of the topics the 
group is interested in,
@@ -494,16 +493,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         log.debug("Performing assignment using strategy {} with subscriptions 
{}", assignor.name(), subscriptions);
 
-        Map<String, Assignment> assignments = 
assignor.assign(metadata.fetch(), subscriptions);
-
-        switch (protocol) {
-            case EAGER:
-                break;
-
-            case COOPERATIVE:
-                adjustAssignment(ownedPartitions, assignments);
-                break;
-        }
+        Map<String, Assignment> assignments = 
assignor.assign(metadata.fetch(), new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // user-customized assignor may have created some topics that are not 
in the subscription list
         // and assign their partitions to the members; in this case we would 
like to update the leader's
@@ -547,40 +537,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         return groupAssignment;
     }
 
-    private void adjustAssignment(final Map<TopicPartition, String> 
ownedPartitions,
-                                  final Map<String, Assignment> assignments) {
-        boolean revocationsNeeded = false;
-        Set<TopicPartition> assignedPartitions = new HashSet<>();
-        for (final Map.Entry<String, Assignment> entry : 
assignments.entrySet()) {
-            final Assignment assignment = entry.getValue();
-            assignedPartitions.addAll(assignment.partitions());
-
-            // update the assignment if the partition is owned by another 
different owner
-            List<TopicPartition> updatedPartitions = 
assignment.partitions().stream()
-                .filter(tp -> ownedPartitions.containsKey(tp) && 
!entry.getKey().equals(ownedPartitions.get(tp)))
-                .collect(Collectors.toList());
-            if (!updatedPartitions.equals(assignment.partitions())) {
-                assignment.updatePartitions(updatedPartitions);
-                revocationsNeeded = true;
-            }
-        }
-
-        // for all owned but not assigned partitions, blindly add them to 
assignment
-        for (final Map.Entry<TopicPartition, String> entry : 
ownedPartitions.entrySet()) {
-            final TopicPartition tp = entry.getKey();
-            if (!assignedPartitions.contains(tp)) {
-                assignments.get(entry.getValue()).partitions().add(tp);
-            }
-        }
-
-        // if revocations are triggered, tell everyone to re-join immediately.
-        if (revocationsNeeded) {
-            for (final Assignment assignment : assignments.values()) {
-                
assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN);
-            }
-        }
-    }
-
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
         // commit offsets prior to rebalance if auto-commit enabled
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index d05d5b0..e852e62 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -49,7 +52,6 @@ import java.util.Map;
  *     Topic            => String
  *     Partitions       => [int32]
  *   UserData           => Bytes
- *   ErrorCode          => [int16]
  * </pre>
  *
  * Version 0 format:
@@ -85,11 +87,11 @@ public class ConsumerProtocol {
     public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
     public static final String USER_DATA_KEY_NAME = "user_data";
 
-    public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", 
"Assignment error code");
-
     public static final short CONSUMER_PROTOCOL_V0 = 0;
     public static final short CONSUMER_PROTOCOL_V1 = 1;
 
+    public static final short CONSUMER_PROTOCL_LATEST_VERSION = 
CONSUMER_PROTOCOL_V1;
+
     public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
             new Field(VERSION_KEY_NAME, Type.INT16));
     private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new 
Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
@@ -116,36 +118,9 @@ public class ConsumerProtocol {
 
     public static final Schema ASSIGNMENT_V1 = new Schema(
         new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
-        new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
-        ERROR_CODE);
-
-    public enum AssignmentError {
-        NONE(0),
-        NEED_REJOIN(1);
+        new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
 
-        private final short code;
-
-        AssignmentError(final int code) {
-            this.code = (short) code;
-        }
-
-        public short code() {
-            return code;
-        }
-
-        public static AssignmentError fromCode(final short code) {
-            switch (code) {
-                case 0:
-                    return NONE;
-                case 1:
-                    return NEED_REJOIN;
-                default:
-                    throw new IllegalArgumentException("Unknown error code: " 
+ code);
-            }
-        }
-    }
-
-    public static ByteBuffer 
serializeSubscriptionV0(PartitionAssignor.Subscription subscription) {
+    public static ByteBuffer serializeSubscriptionV0(Subscription 
subscription) {
         Struct struct = new Struct(SUBSCRIPTION_V0);
         struct.set(USER_DATA_KEY_NAME, subscription.userData());
         struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
@@ -157,7 +132,7 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer 
serializeSubscriptionV1(PartitionAssignor.Subscription subscription) {
+    public static ByteBuffer serializeSubscriptionV1(Subscription 
subscription) {
         Struct struct = new Struct(SUBSCRIPTION_V1);
         struct.set(USER_DATA_KEY_NAME, subscription.userData());
         struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
@@ -178,8 +153,12 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer 
serializeSubscription(PartitionAssignor.Subscription subscription) {
-        switch (subscription.version()) {
+    public static ByteBuffer serializeSubscription(Subscription subscription) {
+        return serializeSubscription(subscription, 
CONSUMER_PROTOCL_LATEST_VERSION);
+    }
+
+    public static ByteBuffer serializeSubscription(Subscription subscription, 
short version) {
+        switch (version) {
             case CONSUMER_PROTOCOL_V0:
                 return serializeSubscriptionV0(subscription);
 
@@ -192,17 +171,17 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Subscription 
deserializeSubscriptionV0(ByteBuffer buffer) {
+    public static Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
         Struct struct = SUBSCRIPTION_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
         for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
             topics.add((String) topicObj);
 
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, 
topics, userData);
+        return new Subscription(topics, userData, Collections.emptyList());
     }
 
-    public static PartitionAssignor.Subscription 
deserializeSubscriptionV1(ByteBuffer buffer) {
+    public static Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
         Struct struct = SUBSCRIPTION_V1.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
@@ -218,10 +197,10 @@ public class ConsumerProtocol {
             }
         }
 
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, 
topics, userData, ownedPartitions);
+        return new Subscription(topics, userData, ownedPartitions);
     }
 
-    public static PartitionAssignor.Subscription 
deserializeSubscription(ByteBuffer buffer) {
+    public static Subscription deserializeSubscription(ByteBuffer buffer) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
@@ -241,7 +220,7 @@ public class ConsumerProtocol {
         }
     }
 
-    public static ByteBuffer 
serializeAssignmentV0(PartitionAssignor.Assignment assignment) {
+    public static ByteBuffer serializeAssignmentV0(Assignment assignment) {
         Struct struct = new Struct(ASSIGNMENT_V0);
         struct.set(USER_DATA_KEY_NAME, assignment.userData());
         List<Struct> topicAssignments = new ArrayList<>();
@@ -261,7 +240,7 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer 
serializeAssignmentV1(PartitionAssignor.Assignment assignment) {
+    public static ByteBuffer serializeAssignmentV1(Assignment assignment) {
         Struct struct = new Struct(ASSIGNMENT_V1);
         struct.set(USER_DATA_KEY_NAME, assignment.userData());
         List<Struct> topicAssignments = new ArrayList<>();
@@ -273,7 +252,6 @@ public class ConsumerProtocol {
             topicAssignments.add(topicAssignment);
         }
         struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        struct.set(ERROR_CODE, assignment.error().code);
 
         ByteBuffer buffer = 
ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + 
ASSIGNMENT_V1.sizeOf(struct));
         CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
@@ -282,8 +260,12 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment 
assignment) {
-        switch (assignment.version()) {
+    public static ByteBuffer serializeAssignment(Assignment assignment) {
+        return serializeAssignment(assignment, 
CONSUMER_PROTOCL_LATEST_VERSION);
+    }
+
+    public static ByteBuffer serializeAssignment(Assignment assignment, short 
version) {
+        switch (version) {
             case CONSUMER_PROTOCOL_V0:
                 return serializeAssignmentV0(assignment);
 
@@ -296,7 +278,7 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Assignment 
deserializeAssignmentV0(ByteBuffer buffer) {
+    public static Assignment deserializeAssignmentV0(ByteBuffer buffer) {
         Struct struct = ASSIGNMENT_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<TopicPartition> partitions = new ArrayList<>();
@@ -307,27 +289,14 @@ public class ConsumerProtocol {
                 partitions.add(new TopicPartition(topic, (Integer) 
partitionObj));
             }
         }
-        return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V0, 
partitions, userData);
+        return new Assignment(partitions, userData);
     }
 
-    public static PartitionAssignor.Assignment 
deserializeAssignmentV1(ByteBuffer buffer) {
-        Struct struct = ASSIGNMENT_V1.read(buffer);
-        ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
-        List<TopicPartition> partitions = new ArrayList<>();
-        for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
-            Struct assignment = (Struct) structObj;
-            String topic = assignment.getString(TOPIC_KEY_NAME);
-            for (Object partitionObj : 
assignment.getArray(PARTITIONS_KEY_NAME)) {
-                partitions.add(new TopicPartition(topic, (Integer) 
partitionObj));
-            }
-        }
-
-        AssignmentError error = 
AssignmentError.fromCode(struct.get(ERROR_CODE));
-
-        return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, 
partitions, userData, error);
+    public static Assignment deserializeAssignmentV1(ByteBuffer buffer) {
+        return deserializeAssignmentV0(buffer);
     }
 
-    public static PartitionAssignor.Assignment 
deserializeAssignment(ByteBuffer buffer) {
+    public static Assignment deserializeAssignment(ByteBuffer buffer) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index c26f684..b3f2ada 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -18,18 +18,12 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.SchemaException;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1;
-
 /**
  * This interface is used to define custom partition assignment for use in
  * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the 
consumer group subscribe
@@ -43,6 +37,7 @@ import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSU
  * userData in the returned Subscription. For example, to have a rack-aware 
assignor, an implementation
  * can use this user data to forward the rackId belonging to each member.
  */
+@Deprecated
 public interface PartitionAssignor {
 
     /**
@@ -79,21 +74,6 @@ public interface PartitionAssignor {
         onAssignment(assignment);
     }
 
-    /**
-     * Indicate which rebalance protocol this assignor works with;
-     * By default it should always work with {@link RebalanceProtocol#EAGER}.
-     */
-    default List<RebalanceProtocol> supportedProtocols() {
-        return Collections.singletonList(RebalanceProtocol.EAGER);
-    }
-
-    /**
-     * Return the version of the assignor which indicates how the user 
metadata encodings
-     * and the assignment algorithm gets evolved.
-     */
-    default short version() {
-        return (short) 0;
-    }
 
     /**
      * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
@@ -101,156 +81,52 @@ public interface PartitionAssignor {
      */
     String name();
 
-    enum RebalanceProtocol {
-        EAGER((byte) 0), COOPERATIVE((byte) 1);
-
-        private final byte id;
-
-        RebalanceProtocol(byte id) {
-            this.id = id;
-        }
-
-        public byte id() {
-            return id;
-        }
-
-        public static RebalanceProtocol forId(byte id) {
-            switch (id) {
-                case 0:
-                    return EAGER;
-                case 1:
-                    return COOPERATIVE;
-                default:
-                    throw new IllegalArgumentException("Unknown rebalance 
protocol id: " + id);
-            }
-        }
-    }
-
     class Subscription {
-        private final Short version;
         private final List<String> topics;
         private final ByteBuffer userData;
-        private final List<TopicPartition> ownedPartitions;
-        private Optional<String> groupInstanceId;
 
-        Subscription(Short version,
-                     List<String> topics,
-                     ByteBuffer userData,
-                     List<TopicPartition> ownedPartitions) {
-            this.version = version;
+        public Subscription(List<String> topics, ByteBuffer userData) {
             this.topics = topics;
             this.userData = userData;
-            this.ownedPartitions = ownedPartitions;
-            this.groupInstanceId = Optional.empty();
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " 
+ version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty())
-                throw new IllegalArgumentException("Subscription version 
smaller than 1 should not have owned partitions");
-        }
-
-        Subscription(Short version, List<String> topics, ByteBuffer userData) {
-            this(version, topics, userData, Collections.emptyList());
-        }
-
-        public Subscription(List<String> topics, ByteBuffer userData, 
List<TopicPartition> ownedPartitions) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
-        }
-
-        public Subscription(List<String> topics, ByteBuffer userData) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData);
         }
 
         public Subscription(List<String> topics) {
             this(topics, ByteBuffer.wrap(new byte[0]));
         }
 
-        Short version() {
-            return version;
-        }
-
         public List<String> topics() {
             return topics;
         }
 
-        public List<TopicPartition> ownedPartitions() {
-            return ownedPartitions;
-        }
-
         public ByteBuffer userData() {
             return userData;
         }
 
-        public void setGroupInstanceId(Optional<String> groupInstanceId) {
-            this.groupInstanceId = groupInstanceId;
-        }
-
-        public Optional<String> groupInstanceId() {
-            return groupInstanceId;
-        }
-
         @Override
         public String toString() {
             return "Subscription(" +
-                    "version=" + version +
-                    ", topics=" + topics +
-                    ", ownedPartitions=" + ownedPartitions +
-                    ", group.instance.id=" + groupInstanceId + ")";
+                "topics=" + topics +
+                ')';
         }
     }
 
     class Assignment {
-        private final Short version;
-        private List<TopicPartition> partitions;
+        private final List<TopicPartition> partitions;
         private final ByteBuffer userData;
-        private ConsumerProtocol.AssignmentError error;
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer 
userData, ConsumerProtocol.AssignmentError error) {
-            this.version = version;
+        public Assignment(List<TopicPartition> partitions, ByteBuffer 
userData) {
             this.partitions = partitions;
             this.userData = userData;
-            this.error = error;
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " 
+ version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && error != 
ConsumerProtocol.AssignmentError.NONE)
-                throw new IllegalArgumentException("Assignment version smaller 
than 1 should not have error code.");
-        }
-
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer 
userData) {
-            this(version, partitions, userData, 
ConsumerProtocol.AssignmentError.NONE);
-        }
-
-        public Assignment(List<TopicPartition> partitions, ByteBuffer 
userData) {
-            this(CONSUMER_PROTOCOL_V1, partitions, userData);
         }
 
         public Assignment(List<TopicPartition> partitions) {
             this(partitions, ByteBuffer.wrap(new byte[0]));
         }
 
-        Short version() {
-            return version;
-        }
-
         public List<TopicPartition> partitions() {
             return partitions;
         }
 
-        public ConsumerProtocol.AssignmentError error() {
-            return error;
-        }
-
-        public void updatePartitions(List<TopicPartition> partitions) {
-            this.partitions = partitions;
-        }
-
-        public void setError(ConsumerProtocol.AssignmentError error) {
-            this.error = error;
-        }
-
         public ByteBuffer userData() {
             return userData;
         }
@@ -258,10 +134,8 @@ public interface PartitionAssignor {
         @Override
         public String toString() {
             return "Assignment(" +
-                    "version=" + version +
-                    ", partitions=" + partitions +
-                    ", error=" + error +
-                    ')';
+                "partitions=" + partitions +
+                ')';
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 3f1cf98..af834ce 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.ArrayList;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
@@ -388,6 +389,13 @@ public class SubscriptionState {
     }
 
     /**
+     * @return a modifiable copy of the currently assigned partitions as a list
+     */
+    public synchronized List<TopicPartition> assignedPartitionsList() {
+        return new ArrayList<>(this.assignment.partitionSet());
+    }
+
+    /**
      * Provides the number of assigned partitions in a thread safe manner.
      * @return the number of assigned partitions.
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 711c8f9..769f58c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -21,9 +21,9 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
@@ -1222,7 +1222,7 @@ public class KafkaAdminClientTest {
             topicPartitions.add(1, myTopicPartition1);
             topicPartitions.add(2, myTopicPartition2);
 
-            final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
PartitionAssignor.Assignment(topicPartitions));
+            final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(topicPartitions));
             byte[] memberAssignmentBytes = new 
byte[memberAssignment.remaining()];
             memberAssignment.get(memberAssignmentBytes);
 
@@ -1282,7 +1282,7 @@ public class KafkaAdminClientTest {
             topicPartitions.add(1, myTopicPartition1);
             topicPartitions.add(2, myTopicPartition2);
 
-            final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
PartitionAssignor.Assignment(topicPartitions));
+            final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(topicPartitions));
             byte[] memberAssignmentBytes = new 
byte[memberAssignment.remaining()];
             memberAssignment.get(memberAssignmentBytes);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c1adf19..1227c27 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -28,7 +28,6 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -396,7 +395,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
 
@@ -430,7 +429,7 @@ public class KafkaConsumerTest {
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -465,7 +464,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final PartitionAssignor assignor = new RoundRobinAssignor();
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -489,7 +488,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final PartitionAssignor assignor = new RoundRobinAssignor();
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -512,7 +511,7 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singleton(tp0));
@@ -587,7 +586,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor,
                 true, groupId, groupInstanceId);
@@ -611,7 +610,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor,
                 true, groupId, groupInstanceId);
@@ -636,7 +635,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor,
                 true, groupId, groupInstanceId);
@@ -663,7 +662,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor,
                 true, groupId, Optional.empty());
@@ -686,7 +685,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
@@ -724,7 +723,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -764,7 +763,7 @@ public class KafkaConsumerTest {
         initMetadata(client, partitionCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         prepareRebalance(client, node, singleton(topic), assignor, 
singletonList(tp0), null);
@@ -782,7 +781,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testChangingRegexSubscription() {
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         String otherTopic = "other";
         TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
@@ -828,7 +827,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -878,7 +877,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final PartitionAssignor assignor = new RoundRobinAssignor();
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -908,7 +907,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic), 
getConsumerRebalanceListener(consumer));
@@ -948,7 +947,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
 
@@ -1062,7 +1061,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, false, groupInstanceId);
 
@@ -1124,7 +1123,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
 
@@ -1180,7 +1179,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, false, groupInstanceId);
 
@@ -1234,7 +1233,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
 
@@ -1429,7 +1428,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -1457,7 +1456,7 @@ public class KafkaConsumerTest {
                 coordinator);
 
         // join group
-        final ByteBuffer byteBuffer = 
ConsumerProtocol.serializeSubscription(new 
PartitionAssignor.Subscription(singletonList(topic)));
+        final ByteBuffer byteBuffer = 
ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(singletonList(topic)));
 
         // This member becomes the leader
         String memberId = "memberId";
@@ -1512,7 +1511,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, subscription, metadata, assignor, false, Optional.empty());
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
@@ -1649,7 +1648,7 @@ public class KafkaConsumerTest {
         initMetadata(client, singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         client.createPendingAuthenticationError(node, 0);
         return newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);
@@ -1675,7 +1674,7 @@ public class KafkaConsumerTest {
                                     subscription, new LogContext(), new 
ClusterResourceListeners());
     }
 
-    private Node prepareRebalance(MockClient client, Node node, final 
Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> 
partitions, Node coordinator) {
+    private Node prepareRebalance(MockClient client, Node node, final 
Set<String> subscribedTopics, ConsumerPartitionAssignor assignor, 
List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
             
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
node), node);
@@ -1692,7 +1691,7 @@ public class KafkaConsumerTest {
                 assertTrue(protocolIterator.hasNext());
 
                 ByteBuffer protocolMetadata = 
ByteBuffer.wrap(protocolIterator.next().metadata());
-                PartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(protocolMetadata);
+                ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(protocolMetadata);
                 return subscribedTopics.equals(new 
HashSet<>(subscription.topics()));
             }
         }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", 
Errors.NONE), coordinator);
@@ -1703,7 +1702,7 @@ public class KafkaConsumerTest {
         return coordinator;
     }
 
-    private Node prepareRebalance(MockClient client, Node node, 
PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+    private Node prepareRebalance(MockClient client, Node node, 
ConsumerPartitionAssignor assignor, List<TopicPartition> partitions, Node 
coordinator) {
         if (coordinator == null) {
             // lookup coordinator
             
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
node), node);
@@ -1764,7 +1763,7 @@ public class KafkaConsumerTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor 
assignor, int generationId, String memberId, String leaderId, Errors error) {
+    private JoinGroupResponse 
joinGroupFollowerResponse(ConsumerPartitionAssignor assignor, int generationId, 
String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(
                 new JoinGroupResponseData()
                         .setErrorCode(error.code())
@@ -1777,7 +1776,7 @@ public class KafkaConsumerTest {
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> 
partitions, Errors error) {
-        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new 
PartitionAssignor.Assignment(partitions));
+        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(partitions));
         return new SyncGroupResponse(
                 new SyncGroupResponseData()
                         .setErrorCode(error.code())
@@ -1848,7 +1847,7 @@ public class KafkaConsumerTest {
                                                       KafkaClient client,
                                                       SubscriptionState 
subscription,
                                                       ConsumerMetadata 
metadata,
-                                                      PartitionAssignor 
assignor,
+                                                      
ConsumerPartitionAssignor assignor,
                                                       boolean 
autoCommitEnabled,
                                                       Optional<String> 
groupInstanceId) {
         return newConsumer(time, client, subscription, metadata, assignor, 
autoCommitEnabled, groupId, groupInstanceId);
@@ -1865,7 +1864,7 @@ public class KafkaConsumerTest {
                                                       KafkaClient client,
                                                       SubscriptionState 
subscription,
                                                       ConsumerMetadata 
metadata,
-                                                      PartitionAssignor 
assignor,
+                                                      
ConsumerPartitionAssignor assignor,
                                                       boolean 
autoCommitEnabled,
                                                       String groupId,
                                                       Optional<String> 
groupInstanceId) {
@@ -1885,7 +1884,7 @@ public class KafkaConsumerTest {
         Deserializer<String> keyDeserializer = new StringDeserializer();
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
-        List<PartitionAssignor> assignors = singletonList(assignor);
+        List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
         ConsumerInterceptors<String, String> interceptors = new 
ConsumerInterceptors<>(Collections.emptyList());
 
         Metrics metrics = new Metrics();
@@ -1985,7 +1984,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Cluster cluster = metadata.fetch();
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         String invalidTopicName = "topic abc";  // Invalid topic name due to 
space
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
index f08ca14..118e60a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
index fa68406..02fb9ff 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.clients.consumer;
 
 import 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo;
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 89f0d37..6dd3062 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -30,7 +30,7 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.kafka.clients.consumer.StickyAssignor.ConsumerUserData;
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a0878fb..a81e73e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -125,9 +126,9 @@ public class ConsumerCoordinatorTest {
     private final MockTime time = new MockTime();
     private GroupRebalanceConfig rebalanceConfig;
 
-    private final PartitionAssignor.RebalanceProtocol protocol;
+    private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
     private final MockPartitionAssignor partitionAssignor;
-    private final List<PartitionAssignor> assignors;
+    private final List<ConsumerPartitionAssignor> assignors;
     private MockClient client;
     private MetadataResponse metadataResponse = 
TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
         {
@@ -144,7 +145,7 @@ public class ConsumerCoordinatorTest {
     private MockCommitCallback mockOffsetCommitCallback;
     private ConsumerCoordinator coordinator;
 
-    public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol 
protocol) {
+    public ConsumerCoordinatorTest(final 
ConsumerPartitionAssignor.RebalanceProtocol protocol) {
         this.protocol = protocol;
         this.partitionAssignor = new 
MockPartitionAssignor(Collections.singletonList(protocol));
         this.assignors = Collections.singletonList(partitionAssignor);
@@ -153,7 +154,7 @@ public class ConsumerCoordinatorTest {
     @Parameterized.Parameters(name = "rebalance protocol = {0}")
     public static Collection<Object[]> data() {
         final List<Object[]> values = new ArrayList<>();
-        for (final PartitionAssignor.RebalanceProtocol protocol: 
PartitionAssignor.RebalanceProtocol.values()) {
+        for (final ConsumerPartitionAssignor.RebalanceProtocol protocol: 
ConsumerPartitionAssignor.RebalanceProtocol.values()) {
             values.add(new Object[]{protocol});
         }
         return values;
@@ -198,20 +199,20 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testSelectRebalanceProtcol() {
-        List<PartitionAssignor> assignors = new ArrayList<>();
-        assignors.add(new 
MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER)));
-        assignors.add(new 
MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
+        assignors.add(new 
MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER)));
+        assignors.add(new 
MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
 
         // no commonly supported protocols
         assertThrows(IllegalArgumentException.class, () -> 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
 
         assignors.clear();
-        assignors.add(new 
MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, 
PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
-        assignors.add(new 
MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, 
PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new 
MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER,
 ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new 
MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER,
 ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
 
         // select higher indexed (more advanced) protocols
         try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
-            assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, 
coordinator.getProtocol());
+            
assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, 
coordinator.getProtocol());
         }
     }
 
@@ -553,7 +554,7 @@ public class ConsumerCoordinatorTest {
         final int addCount = 1;
 
         // with eager protocol we will call revoke on the old assignment as 
well
-        if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) {
+        if (protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER) {
             revokeCount += 1;
         }
 
@@ -670,7 +671,7 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata 
= protocolIterator.next();
 
                 ByteBuffer metadata = 
ByteBuffer.wrap(protocolMetadata.metadata());
-                PartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(metadata);
+                ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(metadata);
                 metadata.rewind();
                 return subscription.topics().containsAll(updatedSubscription);
             }
@@ -2326,7 +2327,7 @@ public class ConsumerCoordinatorTest {
 
     private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig 
rebalanceConfig,
                                                  final Metrics metrics,
-                                                 final List<PartitionAssignor> 
assignors,
+                                                 final 
List<ConsumerPartitionAssignor> assignors,
                                                  final boolean 
autoCommitEnabled) {
         return new ConsumerCoordinator(
                 rebalanceConfig,
@@ -2385,7 +2386,7 @@ public class ConsumerCoordinatorTest {
                                                       Errors error) {
         List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new 
ArrayList<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : 
subscriptions.entrySet()) {
-            PartitionAssignor.Subscription subscription = new 
PartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
             ByteBuffer buf = 
ConsumerProtocol.serializeSubscription(subscription);
             metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
                     .setMemberId(subscriptionEntry.getKey())
@@ -2416,7 +2417,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> 
partitions, Errors error) {
-        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new 
PartitionAssignor.Assignment(partitions));
+        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(partitions));
         return new SyncGroupResponse(
                 new SyncGroupResponseData()
                         .setErrorCode(error.code())
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 9e601b0..3cf7be5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -39,7 +39,6 @@ import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,7 +53,7 @@ public class ConsumerProtocolTest {
 
     @Test
     public void serializeDeserializeMetadata() {
-        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"));
+        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(subscription.topics(), parsedSubscription.topics());
@@ -64,7 +63,7 @@ public class ConsumerProtocolTest {
 
     @Test
     public void serializeDeserializeMetadataAndGroupInstanceId() {
-        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"));
+        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
 
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
@@ -85,8 +84,8 @@ public class ConsumerProtocolTest {
 
     @Test
     public void deserializeOldSubscriptionVersion() {
-        Subscription subscription = new Subscription((short) 0, 
Arrays.asList("foo", "bar"), null);
-        ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
+        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), null);
+        ByteBuffer buffer = 
ConsumerProtocol.serializeSubscriptionV0(subscription);
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
@@ -95,7 +94,7 @@ public class ConsumerProtocolTest {
 
     @Test
     public void deserializeNewSubscriptionWithOldVersion() {
-        Subscription subscription = new Subscription((short) 1, 
Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
+        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), null, Collections.singletonList(tp2));
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
         // ignore the version assuming it is the old byte code, as it will 
blindly deserialize as V0
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
@@ -145,7 +144,7 @@ public class ConsumerProtocolTest {
     @Test
     public void serializeDeserializeAssignment() {
         List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new 
Assignment(partitions));
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new 
Assignment(partitions, ByteBuffer.wrap(new byte[0])));
         Assignment parsedAssignment = 
ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
         assertEquals(0, parsedAssignment.userData().limit());
@@ -161,29 +160,6 @@ public class ConsumerProtocolTest {
     }
 
     @Test
-    public void deserializeOldAssignmentVersion() {
-        List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new 
Assignment((short) 0, partitions, null));
-        Assignment parsedAssignment = 
ConsumerProtocol.deserializeAssignment(buffer);
-        assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
-        assertNull(parsedAssignment.userData());
-        assertEquals(ConsumerProtocol.AssignmentError.NONE, 
parsedAssignment.error());
-    }
-
-    @Test
-    public void deserializeNewAssignmentWithOldVersion() {
-        List<TopicPartition> partitions = Collections.singletonList(tp1);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new 
Assignment((short) 1, partitions, null, 
ConsumerProtocol.AssignmentError.NEED_REJOIN));
-        // ignore the version assuming it is the old byte code, as it will 
blindly deserialize as 0
-        Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
-        header.getShort(VERSION_KEY_NAME);
-        Assignment parsedAssignment = 
ConsumerProtocol.deserializeAssignmentV0(buffer);
-        assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
-        assertNull(parsedAssignment.userData());
-        assertEquals(ConsumerProtocol.AssignmentError.NONE, 
parsedAssignment.error());
-    }
-
-    @Test
     public void deserializeFutureAssignmentVersion() {
         // verify that a new version which adds a field is still parseable
         short version = 100;
@@ -191,7 +167,6 @@ public class ConsumerProtocolTest {
         Schema assignmentSchemaV100 = new Schema(
                 new Field(TOPIC_PARTITIONS_KEY_NAME, new 
ArrayOf(TOPIC_ASSIGNMENT_V0)),
                 new Field(USER_DATA_KEY_NAME, Type.BYTES),
-                ERROR_CODE,
                 new Field("foo", Type.STRING));
 
         Struct assignmentV100 = new Struct(assignmentSchemaV100);
@@ -200,7 +175,6 @@ public class ConsumerProtocolTest {
                         .set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic())
                         .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new 
Object[]{tp1.partition()})});
         assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
-        assignmentV100.set(ERROR_CODE.name, 
ConsumerProtocol.AssignmentError.NEED_REJOIN.code());
         assignmentV100.set("foo", "bar");
 
         Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
@@ -212,8 +186,7 @@ public class ConsumerProtocolTest {
 
         buffer.flip();
 
-        PartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(buffer);
+        Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(Collections.singletonList(tp1)), 
toSet(assignment.partitions()));
-        assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, 
assignment.error());
     }
 }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6761b0c..4e06716 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -31,8 +31,8 @@ import kafka.server.{FetchDataInfo, KafkaConfig, 
LogOffsetMetadata, ReplicaManag
 import kafka.server.HostedPartition
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 714bb0e..fa5f511 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.nio.ByteBuffer;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
@@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
-public class StreamsPartitionAssignor implements PartitionAssignor, 
Configurable {
+public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, 
Configurable {
 
     final static int UNKNOWN = -1;
     private final static int VERSION_ONE = 1;
@@ -309,7 +311,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
     }
 
     @Override
-    public Subscription subscription(final Set<String> topics) {
+    public ByteBuffer subscriptionUserData(final Set<String> topics) {
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Task ids of previously running tasks
@@ -327,7 +329,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 
         taskManager.updateSubscriptionsFromMetadata(topics);
 
-        return new Subscription(new ArrayList<>(topics), data.encode());
+        return data.encode();
     }
 
     private Map<String, Assignment> errorAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
@@ -371,8 +373,8 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
      * 3. within each client, tasks are assigned to consumer clients in 
round-robin manner.
      */
     @Override
-    public Map<String, Assignment> assign(final Cluster metadata,
-                                          final Map<String, Subscription> 
subscriptions) {
+    public GroupAssignment assign(final Cluster metadata, final 
GroupSubscription groupSubscriptions) {
+        final Map<String, Subscription> subscriptions = 
groupSubscriptions.groupSubscription();
         // construct the client metadata from the decoded subscription info
         final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
         final Set<String> futureConsumers = new HashSet<>();
@@ -446,7 +448,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                     !metadata.topics().contains(topic)) {
                     log.error("Missing source topic {} durign assignment. 
Returning error {}.",
                               topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    return errorAssignment(clientMetadataMap, topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
+                    return new 
GroupAssignment(errorAssignment(clientMetadataMap, topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code));
                 }
             }
             for (final InternalTopicConfig topic: 
topicsInfo.repartitionSourceTopics.values()) {
@@ -644,7 +646,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
             assignment = computeNewAssignment(clientMetadataMap, 
partitionsForTask, partitionsByHostState, minReceivedMetadataVersion);
         }
 
-        return assignment;
+        return new GroupAssignment(assignment);
     }
 
     private Map<String, Assignment> computeNewAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
@@ -777,7 +779,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
      * @throws TaskAssignmentException if there is no task id for one of the 
partitions specified
      */
     @Override
-    public void onAssignment(final Assignment assignment) {
+    public void onAssignment(final Assignment assignment, final 
ConsumerGroupMetadata metadata) {
         final List<TopicPartition> partitions = new 
ArrayList<>(assignment.partitions());
         partitions.sort(PARTITION_COMPARATOR);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 5fa6653..616deaf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -146,7 +147,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.replay(taskManager);
     }
 
-    private Map<String, PartitionAssignor.Subscription> subscriptions;
+    private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions;
 
     @Before
     public void setUp() {
@@ -200,7 +201,9 @@ public class StreamsPartitionAssignorTest {
         mockTaskManager(prevTasks, cachedTasks, processId, builder);
 
         configurePartitionAssignor(Collections.emptyMap());
-        final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
+
+        final Set<String> topics = Utils.mkSet("topic1", "topic2");
+        final ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), 
partitionAssignor.subscriptionUserData(topics));
 
         Collections.sort(subscription.topics());
         assertEquals(asList("topic1", "topic2"), subscription.topics());
@@ -236,16 +239,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks10, 
standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks11, 
standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, prevTasks20, 
standbyTasks20, userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, 
t2p1)),
@@ -320,13 +323,13 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                          new PartitionAssignor.Subscription(topics,
+                          new ConsumerPartitionAssignor.Subscription(topics,
                                   new SubscriptionInfo(uuid1, new HashSet<>(), 
new HashSet<>(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                          new PartitionAssignor.Subscription(topics,
+                          new ConsumerPartitionAssignor.Subscription(topics,
                                   new SubscriptionInfo(uuid1, new HashSet<>(), 
new HashSet<>(), userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(localMetadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(localMetadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), 
Utils.mkSet(t1p1, t2p1, t1p3, t2p3)),
@@ -365,10 +368,10 @@ public class StreamsPartitionAssignorTest {
 
         // will throw exception if it fails
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, 
userEndPoint).encode()
         ));
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check assignment info
         final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), 
assignments.get("consumer10"));
@@ -399,12 +402,12 @@ public class StreamsPartitionAssignorTest {
         configurePartitionAssignor(Collections.emptyMap());
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks10, 
standbyTasks10, userEndPoint).encode()
                 ));
 
         // initially metadata is empty
-        Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(emptyMetadata, subscriptions);
+        Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(emptyMetadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions
         assertEquals(Collections.emptySet(),
@@ -417,7 +420,7 @@ public class StreamsPartitionAssignorTest {
         assertEquals(0, allActiveTasks.size());
 
         // then metadata gets populated
-        assignments = partitionAssignor.assign(metadata, subscriptions);
+        assignments = partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, 
t2p1, t1p2, t2p2)),
             Utils.mkSet(new 
HashSet<>(assignments.get("consumer10").partitions())));
@@ -455,16 +458,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks10, emptyTasks, 
userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks11, emptyTasks, 
userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, prevTasks20, emptyTasks, 
userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions: since there is no previous task for 
topic 3 it will be assigned randomly so we cannot check exact match
         // also note that previously assigned partitions / tasks may not stay 
on the previous host since we may assign the new task first and
@@ -521,16 +524,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, 
userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, 
userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, emptyTasks, emptyTasks, 
userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partition size: since there is no previous task and 
there are two sub-topologies the assignment is random so we cannot check exact 
match
         assertEquals(2, assignments.get("consumer10").partitions().size());
@@ -609,16 +612,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks00, 
standbyTasks01, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks01, 
standbyTasks02, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, prevTasks02, 
standbyTasks00, "any:9097").encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // the first consumer
         final AssignmentInfo info10 = checkAssignment(allTopics, 
assignments.get("consumer10"));
@@ -666,7 +669,7 @@ public class StreamsPartitionAssignorTest {
         standbyTasks.put(task2, Utils.mkSet(t3p2));
 
         final AssignmentInfo info = new AssignmentInfo(activeTaskList, 
standbyTasks, hostState);
-        final PartitionAssignor.Assignment assignment = new 
PartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
+        final ConsumerPartitionAssignor.Assignment assignment = new 
ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
 
         final Capture<Cluster> capturedCluster = EasyMock.newCapture();
         taskManager.setPartitionsByHostState(hostState);
@@ -677,7 +680,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager);
 
-        partitionAssignor.onAssignment(assignment);
+        partitionAssignor.onAssignment(assignment, null);
 
         EasyMock.verify(taskManager);
 
@@ -704,10 +707,10 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, 
userEndPoint).encode())
         );
-        partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check prepared internal topics
         assertEquals(1, internalTopicManager.readyTopics.size());
@@ -738,10 +741,10 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, 
userEndPoint).encode())
         );
-        partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         // check prepared internal topics
         assertEquals(2, internalTopicManager.readyTopics.size());
@@ -790,11 +793,11 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
         subscriptions.put(client,
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         asList("topic1", "topic3"),
                         new SubscriptionInfo(uuid, emptyTasks, emptyTasks, 
userEndPoint).encode())
         );
-        final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         final Map<String, Integer> expectedCreatedInternalTopics = new 
HashMap<>();
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
@@ -841,7 +844,8 @@ public class StreamsPartitionAssignorTest {
             uuid1,
             builder);
         
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
 userEndPoint));
-        final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("input"));
+        final Set<String> topics = Utils.mkSet("input");
+        final ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), 
partitionAssignor.subscriptionUserData(topics));
         final SubscriptionInfo subscriptionInfo = 
SubscriptionInfo.decode(subscription.userData());
         assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
     }
@@ -863,11 +867,11 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, 
userEndPoint).encode())
         );
-        final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
-        final PartitionAssignor.Assignment consumerAssignment = 
assignments.get("consumer1");
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+        final ConsumerPartitionAssignor.Assignment consumerAssignment = 
assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = 
AssignmentInfo.decode(consumerAssignment.userData());
         final Set<TopicPartition> topicPartitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
         assertEquals(
@@ -961,11 +965,11 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
         subscriptions.put(client,
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("unknownTopic"),
                         new SubscriptionInfo(uuid, emptyTasks, emptyTasks, 
userEndPoint).encode())
         );
-        final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         assertThat(mockInternalTopicManager.readyTopics.isEmpty(), 
equalTo(true));
 
@@ -985,7 +989,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager);
 
-        partitionAssignor.onAssignment(createAssignment(hostState));
+        partitionAssignor.onAssignment(createAssignment(hostState), null);
 
         EasyMock.verify(taskManager);
     }
@@ -1015,18 +1019,18 @@ public class StreamsPartitionAssignorTest {
             mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(uuid, emptyTasks, emptyTasks, 
userEndPoint).encode())
         );
         subscriptions.put("consumer2",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(UUID.randomUUID(), emptyTasks, 
emptyTasks, "other:9090").encode())
         );
         final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, 
t1p2);
-        final Map<String, PartitionAssignor.Assignment> assign = 
partitionAssignor.assign(metadata, subscriptions);
-        final PartitionAssignor.Assignment consumer1Assignment = 
assign.get("consumer1");
+        final Map<String, ConsumerPartitionAssignor.Assignment> assign = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+        final ConsumerPartitionAssignor.Assignment consumer1Assignment = 
assign.get("consumer1");
         final AssignmentInfo assignmentInfo = 
AssignmentInfo.decode(consumer1Assignment.userData());
         final Set<TopicPartition> consumer1partitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
         final Set<TopicPartition> consumer2Partitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
@@ -1109,12 +1113,12 @@ public class StreamsPartitionAssignorTest {
     private void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int 
smallestVersion,
                                                                                
      final int otherVersion) {
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(smallestVersion, 
UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
         );
         subscriptions.put("consumer2",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(otherVersion, UUID.randomUUID(), 
emptyTasks, emptyTasks, null).encode()
                 )
@@ -1126,7 +1130,7 @@ public class StreamsPartitionAssignorTest {
             UUID.randomUUID(),
             builder);
         partitionAssignor.configure(configProps());
-        final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         assertThat(assignment.size(), equalTo(2));
         
assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(),
 equalTo(smallestVersion));
@@ -1142,7 +1146,8 @@ public class StreamsPartitionAssignorTest {
             builder);
         
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 StreamsConfig.UPGRADE_FROM_0100));
 
-        final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("topic1"));
+        final Set<String> topics = Utils.mkSet("topic1");
+        final ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), 
partitionAssignor.subscriptionUserData(topics));
 
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), 
equalTo(1));
     }
@@ -1180,7 +1185,8 @@ public class StreamsPartitionAssignorTest {
             builder);
         
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 upgradeFromValue));
 
-        final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("topic1"));
+        final Set<String> topics = Utils.mkSet("topic1");
+        final ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), 
partitionAssignor.subscriptionUserData(topics));
 
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), 
equalTo(2));
     }
@@ -1200,12 +1206,12 @@ public class StreamsPartitionAssignorTest {
         };
 
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(UUID.randomUUID(), activeTasks, 
standbyTasks, null).encode())
         );
         subscriptions.put("future-consumer",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         encodeFutureSubscription())
         );
@@ -1216,7 +1222,7 @@ public class StreamsPartitionAssignorTest {
             UUID.randomUUID(),
             builder);
         partitionAssignor.configure(configProps());
-        final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
         assertThat(assignment.size(), equalTo(2));
         assertThat(
@@ -1252,12 +1258,12 @@ public class StreamsPartitionAssignorTest {
 
     private void 
shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final 
int oldVersion) {
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(oldVersion, UUID.randomUUID(), 
emptyTasks, emptyTasks, null).encode())
         );
         subscriptions.put("future-consumer",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         encodeFutureSubscription())
         );
@@ -1270,24 +1276,24 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.configure(configProps());
 
         try {
-            partitionAssignor.assign(metadata, subscriptions);
+            partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
             fail("Should have thrown IllegalStateException");
         } catch (final IllegalStateException expected) {
             // pass
         }
     }
 
-    private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, 
Set<TopicPartition>> firstHostState) {
+    private ConsumerPartitionAssignor.Assignment createAssignment(final 
Map<HostInfo, Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(),
                                                        Collections.emptyMap(),
                                                        firstHostState);
 
-        return new PartitionAssignor.Assignment(
+        return new ConsumerPartitionAssignor.Assignment(
                 Collections.emptyList(), info.encode());
     }
 
     private AssignmentInfo checkAssignment(final Set<String> expectedTopics,
-                                           final PartitionAssignor.Assignment 
assignment) {
+                                           final 
ConsumerPartitionAssignor.Assignment assignment) {
 
         // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an 
only one topic group.
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 27bee81..0b2d0b3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -18,8 +18,9 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -113,7 +114,7 @@ public class StreamsUpgradeTest {
         }
 
         @Override
-        public Subscription subscription(final Set<String> topics) {
+        public ByteBuffer subscriptionUserData(final Set<String> topics) {
             // Adds the following information to subscription
             // 1. Client UUID (a unique id assigned to an instance of 
KafkaStreams)
             // 2. Task ids of previously running tasks
@@ -133,13 +134,13 @@ public class StreamsUpgradeTest {
 
             taskManager.updateSubscriptionsFromMetadata(topics);
 
-            return new Subscription(new ArrayList<>(topics), data.encode());
+            return data.encode();
         }
 
         @Override
-        public void onAssignment(final PartitionAssignor.Assignment 
assignment) {
+        public void onAssignment(final ConsumerPartitionAssignor.Assignment 
assignment, final ConsumerGroupMetadata metadata) {
             try {
-                super.onAssignment(assignment);
+                super.onAssignment(assignment, metadata);
                 return;
             } catch (final TaskAssignmentException cannotProcessFutureVersion) 
{
                 // continue
@@ -183,15 +184,15 @@ public class StreamsUpgradeTest {
         }
 
         @Override
-        public Map<String, Assignment> assign(final Cluster metadata,
-                                              final Map<String, Subscription> 
subscriptions) {
+        public GroupAssignment assign(final Cluster metadata, final 
GroupSubscription groupSubscription) {
+            final Map<String, Subscription> subscriptions = 
groupSubscription.groupSubscription();
             Map<String, Assignment> assignment = null;
 
             final Map<String, Subscription> downgradedSubscriptions = new 
HashMap<>();
             for (final Subscription subscription : subscriptions.values()) {
                 final SubscriptionInfo info = 
SubscriptionInfo.decode(subscription.userData());
                 if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION 
+ 1) {
-                    assignment = super.assign(metadata, subscriptions);
+                    assignment = super.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
                     break;
                 }
             }
@@ -219,7 +220,7 @@ public class StreamsUpgradeTest {
                                 info.userEndPoint())
                                 .encode()));
                 }
-                assignment = super.assign(metadata, downgradedSubscriptions);
+                assignment = super.assign(metadata, new 
GroupSubscription(downgradedSubscriptions)).groupAssignment();
                 bumpUsedVersion = true;
                 bumpSupportedVersion = true;
             }
@@ -238,7 +239,7 @@ public class StreamsUpgradeTest {
                             .encode()));
             }
 
-            return newAssignment;
+            return new GroupAssignment(newAssignment);
         }
     }
 

Reply via email to