This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d23ce20bdfb KAFKA-14352: Rack-aware consumer partition assignment 
protocol changes (KIP-881) (#12954)
d23ce20bdfb is described below

commit d23ce20bdfbe5a9598523961cb7cf747ce4f52ef
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Wed Dec 7 11:41:21 2022 +0000

    KAFKA-14352: Rack-aware consumer partition assignment protocol changes 
(KIP-881) (#12954)
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../consumer/ConsumerPartitionAssignor.java        | 15 +++--
 .../kafka/clients/consumer/KafkaConsumer.java      |  3 +-
 .../consumer/internals/ConsumerCoordinator.java    |  8 ++-
 .../consumer/internals/ConsumerProtocol.java       |  5 +-
 .../common/message/ConsumerProtocolAssignment.json |  3 +-
 .../message/ConsumerProtocolSubscription.json      |  6 +-
 .../consumer/CooperativeStickyAssignorTest.java    |  8 +--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  3 +-
 .../kafka/clients/consumer/StickyAssignorTest.java |  8 +--
 .../internals/ConsumerCoordinatorTest.java         | 67 +++++++++++++++++++---
 .../consumer/internals/ConsumerProtocolTest.java   | 23 ++++++--
 .../kafka/api/PlaintextConsumerTest.scala          | 17 ++++++
 13 files changed, 136 insertions(+), 32 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ac0accc17fa..cea6a193790 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
     <suppress checks="ParameterNumber"
               files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
     <suppress checks="ParameterNumber"
-              files="KafkaConsumer.java"/>
+              files="(KafkaConsumer|ConsumerCoordinator).java"/>
     <suppress checks="ParameterNumber"
               files="Fetcher.java"/>
     <suppress checks="ParameterNumber"
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index f544ce77e6b..0488c2b8c8a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -103,27 +103,29 @@ public interface ConsumerPartitionAssignor {
         private final List<String> topics;
         private final ByteBuffer userData;
         private final List<TopicPartition> ownedPartitions;
+        private final Optional<String> rackId;
         private Optional<String> groupInstanceId;
         private final Optional<Integer> generationId;
 
-        public Subscription(List<String> topics, ByteBuffer userData, 
List<TopicPartition> ownedPartitions, int generationId) {
+        public Subscription(List<String> topics, ByteBuffer userData, 
List<TopicPartition> ownedPartitions, int generationId, Optional<String> 
rackId) {
             this.topics = topics;
             this.userData = userData;
             this.ownedPartitions = ownedPartitions;
             this.groupInstanceId = Optional.empty();
             this.generationId = generationId < 0 ? Optional.empty() : 
Optional.of(generationId);
+            this.rackId = rackId;
         }
 
         public Subscription(List<String> topics, ByteBuffer userData, 
List<TopicPartition> ownedPartitions) {
-            this(topics, userData, ownedPartitions, DEFAULT_GENERATION);
+            this(topics, userData, ownedPartitions, DEFAULT_GENERATION, 
Optional.empty());
         }
 
         public Subscription(List<String> topics, ByteBuffer userData) {
-            this(topics, userData, Collections.emptyList(), 
DEFAULT_GENERATION);
+            this(topics, userData, Collections.emptyList(), 
DEFAULT_GENERATION, Optional.empty());
         }
 
         public Subscription(List<String> topics) {
-            this(topics, null, Collections.emptyList(), DEFAULT_GENERATION);
+            this(topics, null, Collections.emptyList(), DEFAULT_GENERATION, 
Optional.empty());
         }
 
         public List<String> topics() {
@@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor {
             return ownedPartitions;
         }
 
+        public Optional<String> rackId() {
+            return rackId;
+        }
+
         public void setGroupInstanceId(Optional<String> groupInstanceId) {
             this.groupInstanceId = groupInstanceId;
         }
@@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor {
                 ", ownedPartitions=" + ownedPartitions +
                 ", groupInstanceId=" + 
groupInstanceId.map(String::toString).orElse("null") +
                 ", generationId=" + generationId.orElse(-1) +
+                ", rackId=" + (rackId.orElse("null")) +
                 ")";
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f07846945a7..cf85798f82b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -790,7 +790,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         enableAutoCommit,
                         
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                         this.interceptors,
-                        
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+                        
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
+                        config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
             }
             this.fetcher = new Fetcher<>(
                     logContext,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 051962d4435..fec31fe80f8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -117,6 +117,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     private AtomicBoolean asyncCommitFenced;
     private ConsumerGroupMetadata groupMetadata;
     private final boolean throwOnFetchStableOffsetsUnsupported;
+    private final Optional<String> rackId;
 
     // hold onto request&future for committed offset requests to enable async 
calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
@@ -162,7 +163,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                boolean autoCommitEnabled,
                                int autoCommitIntervalMs,
                                ConsumerInterceptors<?, ?> interceptors,
-                               boolean throwOnFetchStableOffsetsUnsupported) {
+                               boolean throwOnFetchStableOffsetsUnsupported,
+                               String rackId) {
         super(rebalanceConfig,
               logContext,
               client,
@@ -186,6 +188,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
             JoinGroupRequest.UNKNOWN_GENERATION_ID, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);
         this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
+        this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
 
         if (autoCommitEnabled)
             this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
@@ -245,7 +248,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             Subscription subscription = new Subscription(topics,
                                                          
assignor.subscriptionUserData(joinedSubscription),
                                                          
subscriptions.assignedPartitionsList(),
-                                                         
generation().generationId);
+                                                         
generation().generationId,
+                                                         rackId);
             ByteBuffer metadata = 
ConsumerProtocol.serializeSubscription(subscription);
 
             protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index b237a3b0c21..79feb4eb57b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * ConsumerProtocol contains the schemas for consumer subscriptions and 
assignments for use with
@@ -89,6 +90,7 @@ public class ConsumerProtocol {
             }
             partition.partitions().add(tp.partition());
         }
+        subscription.rackId().ifPresent(data::setRackId);
 
         data.setGenerationId(subscription.generationId().orElse(-1));
         return MessageUtil.toVersionPrefixedByteBuffer(version, data);
@@ -112,7 +114,8 @@ public class ConsumerProtocol {
                 data.topics(),
                 data.userData() != null ? data.userData().duplicate() : null,
                 ownedPartitions,
-                data.generationId());
+                data.generationId(),
+                data.rackId() == null || data.rackId().isEmpty() ? 
Optional.empty() : Optional.of(data.rackId()));
         } catch (BufferUnderflowException e) {
             throw new SchemaException("Buffer underflow while parsing consumer 
protocol's subscription", e);
         }
diff --git 
a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json 
b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
index 8b0c138d698..fe07aaeadff 100644
--- a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
+++ b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
@@ -23,7 +23,8 @@
   // that new versions cannot remove or reorder any of the existing fields.
   //
   // Version 2 is to support a new field "GenerationId" in 
ConsumerProtocolSubscription.
-  "validVersions": "0-2",
+  // Version 3 adds rack id to ConsumerProtocolSubscription.
+  "validVersions": "0-3",
   "flexibleVersions": "none",
   "fields": [
     { "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": 
"0+",
diff --git 
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json 
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
index 6997d56a04b..49801c65f77 100644
--- 
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
+++ 
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
@@ -24,7 +24,8 @@
 
   // Version 1 added the "OwnedPartitions" field to allow assigner know what 
partitions each member owned
   // Version 2 added a new field "GenerationId" to indicate if the member has 
out-of-date ownedPartitions.
-  "validVersions": "0-2",
+  // Version 3 adds rack id to enable rack-aware assignment.
+  "validVersions": "0-3",
   "flexibleVersions": "none",
   "fields": [
     { "name": "Topics", "type": "[]string", "versions": "0+" },
@@ -36,6 +37,7 @@
         { "name": "Partitions", "type": "[]int32", "versions": "1+"}
       ]
     },
-    { "name": "GenerationId", "type": "int32", "versions": "2+", "default": 
"-1", "ignorable": true }
+    { "name": "GenerationId", "type": "int32", "versions": "2+", "default": 
"-1", "ignorable": true },
+    { "name": "RackId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "default": "null", "ignorable": true }
   ]
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index 9284de50ab7..28a988b8d82 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -54,12 +54,12 @@ public class CooperativeStickyAssignorTest extends 
AbstractStickyAssignorTest {
     @Override
     public Subscription buildSubscriptionV1(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
         assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(partitions), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
-        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, DEFAULT_GENERATION);
+        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, DEFAULT_GENERATION, Optional.empty());
     }
 
     @Override
     public Subscription buildSubscriptionV2Above(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
-        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, generationId);
+        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, generationId, Optional.empty());
     }
 
     @Override
@@ -156,7 +156,7 @@ public class CooperativeStickyAssignorTest extends 
AbstractStickyAssignorTest {
 
         // subscription containing empty owned partitions and the same 
generation id, and non-empty owned partition in user data,
         // member data should honor the one in subscription since 
cooperativeStickyAssignor only supports ConsumerProtocolSubscription v1 and 
above
-        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId), 
Collections.emptyList(), generationId);
+        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId), 
Collections.emptyList(), generationId, Optional.empty());
 
         AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
         assertEquals(Collections.emptyList(), memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
@@ -170,7 +170,7 @@ public class CooperativeStickyAssignorTest extends 
AbstractStickyAssignorTest {
 
         // subscription containing empty owned partitions and a higher 
generation id, and non-empty owned partition in user data,
         // member data should honor the one in subscription since generation 
id is higher
-        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId - 1), 
Collections.emptyList(), generationId);
+        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId - 1), 
Collections.emptyList(), generationId, Optional.empty());
 
         AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
         assertEquals(Collections.emptyList(), memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2c667dcb5d6..f08ac45ddaf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2614,7 +2614,8 @@ public class KafkaConsumerTest {
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 interceptors,
-                throwOnStableOffsetNotSupported);
+                throwOnStableOffsetNotSupported,
+                null);
         }
         Fetcher<String, String> fetcher = new Fetcher<>(
                 loggerFactory,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index b27aa0e4a74..9dbc085f76c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -53,19 +53,19 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
     @Override
     public Subscription buildSubscriptionV0(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
         return new Subscription(topics, serializeTopicPartitionAssignment(new 
MemberData(partitions, Optional.of(generationId))),
-            Collections.emptyList(), DEFAULT_GENERATION);
+            Collections.emptyList(), DEFAULT_GENERATION, Optional.empty());
     }
 
     @Override
     public Subscription buildSubscriptionV1(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
         return new Subscription(topics, serializeTopicPartitionAssignment(new 
MemberData(partitions, Optional.of(generationId))),
-            partitions, DEFAULT_GENERATION);
+            partitions, DEFAULT_GENERATION, Optional.empty());
     }
 
     @Override
     public Subscription buildSubscriptionV2Above(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
         return new Subscription(topics, serializeTopicPartitionAssignment(new 
MemberData(partitions, Optional.of(generationId))),
-            partitions, generationId);
+            partitions, generationId, Optional.empty());
     }
 
     @Override
@@ -308,7 +308,7 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
         List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), 
tp(topic2, 1));
         int generationIdInUserData = generationId - 1;
 
-        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationIdInUserData), 
Collections.emptyList(), generationId);
+        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationIdInUserData), 
Collections.emptyList(), generationId, Optional.empty());
         AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
         // in StickyAssignor with eager rebalance protocol, we'll always honor 
data in user data
         assertEquals(ownedPartitions, memberData.partitions, "subscription: " 
+ subscription + " doesn't have expected owned partition");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9dd44740a55..36bf0ea6825 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -117,6 +117,7 @@ import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
 import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
 import static 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -1847,7 +1848,7 @@ public abstract class ConsumerCoordinatorTest {
         // note that `MockPartitionAssignor.prepare` is not called therefore 
calling `MockPartitionAssignor.assign`
         // will throw a IllegalStateException. this indirectly verifies that 
`assign` is correctly skipped.
         Map<String, List<String>> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic1));
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberSubscriptions, true, Errors.NONE));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberSubscriptions, true, Errors.NONE, Optional.empty()));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -1879,7 +1880,7 @@ public abstract class ConsumerCoordinatorTest {
             mkEntry(consumerId, singletonList(topic1)),
             mkEntry(consumerId2, singletonList(topic2))
         );
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberSubscriptions, true, Errors.NONE));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberSubscriptions, true, Errors.NONE, Optional.empty()));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -3490,6 +3491,36 @@ public abstract class ConsumerCoordinatorTest {
         assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
     }
 
+    @Test
+    public void testSubscriptionRackId() {
+        metrics.close();
+        coordinator.close(time.timer(0));
+
+        String rackId = "rack-a";
+        metrics = new Metrics(time);
+        RackAwareAssignor assignor = new RackAwareAssignor();
+
+        coordinator = new ConsumerCoordinator(rebalanceConfig, new 
LogContext(), consumerClient,
+                Collections.singletonList(assignor), metadata, subscriptions,
+                metrics, consumerId + groupId, time, false, 
autoCommitIntervalMs, null, false, rackId);
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<String, List<String>> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic1));
+        assignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberSubscriptions, false, Errors.NONE, Optional.of(rackId)));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertEquals(singleton(t1p), 
coordinator.subscriptionState().assignedPartitions());
+        assertEquals(singleton(rackId), assignor.rackIds);
+    }
+
     @Test
     public void testThrowOnUnsupportedStableFlag() {
         supportStableFlag((short) 6, true);
@@ -3514,7 +3545,8 @@ public abstract class ConsumerCoordinatorTest {
             false,
             autoCommitIntervalMs,
             null,
-            true);
+            true,
+            null);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         
client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, 
(short) 0, upperVersion));
@@ -3675,7 +3707,8 @@ public abstract class ConsumerCoordinatorTest {
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 null,
-                false);
+                false,
+                null);
     }
 
     private Collection<TopicPartition> getRevoked(final List<TopicPartition> 
owned,
@@ -3731,7 +3764,7 @@ public abstract class ConsumerCoordinatorTest {
         Map<String, List<String>> subscriptions,
         Errors error
     ) {
-        return joinGroupLeaderResponse(generationId, memberId, subscriptions, 
false, error);
+        return joinGroupLeaderResponse(generationId, memberId, subscriptions, 
false, error, Optional.empty());
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(
@@ -3739,11 +3772,13 @@ public abstract class ConsumerCoordinatorTest {
         String memberId,
         Map<String, List<String>> subscriptions,
         boolean skipAssignment,
-        Errors error
+        Errors error,
+        Optional<String> rackId
     ) {
         List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new 
ArrayList<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : 
subscriptions.entrySet()) {
-            ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue(),
+                    null, Collections.emptyList(), DEFAULT_GENERATION, rackId);
             ByteBuffer buf = 
ConsumerProtocol.serializeSubscription(subscription);
             metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
                     .setMemberId(subscriptionEntry.getKey())
@@ -3901,4 +3936,22 @@ public abstract class ConsumerCoordinatorTest {
             this.exception = exception;
         }
     }
+
+    private static class RackAwareAssignor extends MockPartitionAssignor {
+        private final Set<String> rackIds = new HashSet<>();
+
+        RackAwareAssignor() {
+            super(Arrays.asList(RebalanceProtocol.EAGER));
+        }
+
+        @Override
+        public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic, Map<String, Subscription> subscriptions) {
+            subscriptions.forEach((consumer, subscription) -> {
+                if (!subscription.rackId().isPresent())
+                    throw new IllegalStateException("Rack id not provided in 
subscription for " + consumer);
+                rackIds.add(subscription.rackId().get());
+            });
+            return super.assign(partitionsPerTopic, subscriptions);
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 5106432ee53..0e97bfbb318 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -49,6 +49,7 @@ public class ConsumerProtocolTest {
     private final TopicPartition tp2 = new TopicPartition("bar", 2);
     private final Optional<String> groupInstanceId = 
Optional.of("instance.id");
     private final int generationId = 1;
+    private final Optional<String> rackId = Optional.of("rack-a");
 
     @Test
     public void serializeDeserializeSubscriptionAllVersions() {
@@ -56,7 +57,7 @@ public class ConsumerProtocolTest {
             new TopicPartition("foo", 0),
             new TopicPartition("bar", 0));
         Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"),
-            ByteBuffer.wrap("hello".getBytes()), ownedPartitions, 
generationId);
+            ByteBuffer.wrap("hello".getBytes()), ownedPartitions, 
generationId, rackId);
 
         for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
             ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription, version);
@@ -77,6 +78,12 @@ public class ConsumerProtocolTest {
             } else {
                 assertFalse(parsedSubscription.generationId().isPresent());
             }
+
+            if (version >= 3) {
+                assertEquals(rackId, parsedSubscription.rackId());
+            } else {
+                assertEquals(Optional.empty(), parsedSubscription.rackId());
+            }
         }
     }
 
@@ -89,6 +96,7 @@ public class ConsumerProtocolTest {
         assertEquals(0, parsedSubscription.userData().limit());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
         assertFalse(parsedSubscription.generationId().isPresent());
+        assertFalse(parsedSubscription.rackId().isPresent());
     }
 
     @Test
@@ -102,6 +110,7 @@ public class ConsumerProtocolTest {
         assertEquals(0, parsedSubscription.userData().limit());
         assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
         assertFalse(parsedSubscription.generationId().isPresent());
+        assertFalse(parsedSubscription.rackId().isPresent());
     }
 
     @Test
@@ -111,6 +120,7 @@ public class ConsumerProtocolTest {
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(toSet(subscription.topics()), 
toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
+        assertFalse(parsedSubscription.rackId().isPresent());
     }
 
     @Test
@@ -146,14 +156,15 @@ public class ConsumerProtocolTest {
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
         assertFalse(parsedSubscription.generationId().isPresent());
+        assertFalse(parsedSubscription.rackId().isPresent());
     }
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void deserializeNewSubscriptionWithOldVersion(boolean 
hasGenerationId) {
+    public void deserializeNewSubscriptionWithOldVersion(boolean 
hasGenerationIdAndRack) {
         Subscription subscription;
-        if (hasGenerationId) {
-            subscription = new Subscription(Arrays.asList("foo", "bar"), null, 
Collections.singletonList(tp2), generationId);
+        if (hasGenerationIdAndRack) {
+            subscription = new Subscription(Arrays.asList("foo", "bar"), null, 
Collections.singletonList(tp2), generationId, rackId);
         } else {
             subscription = new Subscription(Arrays.asList("foo", "bar"), null, 
Collections.singletonList(tp2));
         }
@@ -166,6 +177,7 @@ public class ConsumerProtocolTest {
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
         assertFalse(parsedSubscription.generationId().isPresent());
+        assertFalse(parsedSubscription.rackId().isPresent());
     }
 
     @Test
@@ -178,6 +190,7 @@ public class ConsumerProtocolTest {
         assertEquals(Collections.singleton(tp2), 
toSet(subscription.ownedPartitions()));
         assertEquals(groupInstanceId, subscription.groupInstanceId());
         assertEquals(generationId, 
subscription.generationId().orElse(DEFAULT_GENERATION));
+        assertEquals(rackId, subscription.rackId());
     }
 
     @Test
@@ -253,6 +266,7 @@ public class ConsumerProtocolTest {
             new Field("owned_partitions", new ArrayOf(
                 ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)),
             new Field("generation_id", Type.INT32),
+            new Field("rack_id", Type.STRING),
             new Field("bar", Type.STRING));
 
         Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
@@ -263,6 +277,7 @@ public class ConsumerProtocolTest {
             .set("topic", tp2.topic())
             .set("partitions", new Object[]{tp2.partition()})});
         subscriptionV100.set("generation_id", generationId);
+        subscriptionV100.set("rack_id", rackId.orElse(null));
         subscriptionV100.set("bar", "bar");
 
         Struct headerV100 = new Struct(new Schema(new Field("version", 
Type.INT16)));
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 92670dba3b3..bfc280a1678 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1902,5 +1902,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @Test
+  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
+    
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
+    val consumer = createConsumer()
+    consumer.subscribe(Set(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+  }
+}
+
+class RackAwareAssignor extends RoundRobinAssignor {
+  override def assign(partitionsPerTopic: util.Map[String, Integer], 
subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): 
util.Map[String, util.List[TopicPartition]] = {
+    assertEquals(1, subscriptions.size())
+    assertEquals(Optional.of("rack-a"), 
subscriptions.values.asScala.head.rackId)
+    super.assign(partitionsPerTopic, subscriptions)
+  }
 }
 

Reply via email to