This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 949617b0b2e KAFKA-17747: [7/N] Add consumer group integration test for 
rack aware assignment (#19856)
949617b0b2e is described below

commit 949617b0b2e237d713542ec76d52b733224bdea1
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Jun 5 01:32:17 2025 +0800

    KAFKA-17747: [7/N] Add consumer group integration test for rack aware 
assignment (#19856)
    
    * Add `RackAwareAssignor`. It uses `racksForPartition` to check the rack
    id of a partition and assign it to a member which has the same rack id.
    * Add `ConsumerIntegrationTest#testRackAwareAssignment` to check
    `racksForPartition` works correctly.
    
    Reviewers: David Jacot <[email protected]>
    
    ---------
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   1 +
 .../clients/consumer/ConsumerIntegrationTest.java  | 119 +++++++++++++++++++++
 .../kafka/clients/consumer/RackAwareAssignor.java  | 104 ++++++++++++++++++
 .../apache/kafka/clients/GroupRebalanceConfig.java |   7 ++
 .../consumer/internals/ClassicKafkaConsumer.java   |   3 +-
 .../consumer/internals/ConsumerCoordinator.java    |   5 +-
 .../internals/ConsumerHeartbeatRequestManager.java |   7 ++
 .../internals/ConsumerMembershipManager.java       |  10 ++
 .../consumer/internals/RequestManagers.java        |   1 +
 .../kafka/clients/GroupRebalanceConfigTest.java    |  58 ++++++++++
 .../internals/AbstractCoordinatorTest.java         |   1 +
 .../internals/ConsumerCoordinatorTest.java         |  15 ++-
 .../ConsumerHeartbeatRequestManagerTest.java       |  28 +++++
 .../internals/ConsumerMembershipManagerTest.java   |  26 +++--
 .../clients/consumer/internals/HeartbeatTest.java  |   1 +
 .../WorkerCoordinatorIncrementalTest.java          |   1 +
 .../runtime/distributed/WorkerCoordinatorTest.java |   1 +
 18 files changed, 368 insertions(+), 21 deletions(-)

diff --git a/build.gradle b/build.gradle
index 968eda48e88..a3f1965f81a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1995,6 +1995,7 @@ project(':clients:clients-integration-tests') {
     implementation project(':server-common')
     testImplementation project(':metadata')
     implementation project(':group-coordinator')
+    implementation project(':group-coordinator:group-coordinator-api')
     implementation project(':transaction-coordinator')
 
     testImplementation libs.junitJupiter
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9fc1bcd7eff..b1ef62ca3a2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -235,6 +235,7 @@
   <subpackage name="clients">
     <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.clients" exact-match="true"/>
+    <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
     <allow pkg="org.apache.kafka.test" />
     <allow class="org.apache.logging.log4j.Level" />
 
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index a56de229318..8a41c1a8beb 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -16,7 +16,12 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
 import 
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
@@ -31,12 +36,16 @@ import 
org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTests;
 import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -216,6 +225,116 @@ public class ConsumerIntegrationTest {
         }
     }
 
+    @ClusterTest(
+        types = {Type.KRAFT},
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
+            @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
+            @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor")
+        }
+    )
+    public void testRackAwareAssignment(ClusterInstance clusterInstance) 
throws ExecutionException, InterruptedException {
+        String topic = "test-topic";
+        try (Admin admin = clusterInstance.admin();
+             Producer<byte[], byte[]> producer = clusterInstance.producer();
+             Consumer<byte[], byte[]> consumer0 = 
clusterInstance.consumer(Map.of(
+                 ConsumerConfig.GROUP_ID_CONFIG, "group0",
+                 ConsumerConfig.CLIENT_RACK_CONFIG, "rack0",
+                 ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()
+             ));
+             Consumer<byte[], byte[]> consumer1 = 
clusterInstance.consumer(Map.of(
+                 ConsumerConfig.GROUP_ID_CONFIG, "group0",
+                 ConsumerConfig.CLIENT_RACK_CONFIG, "rack1",
+                 ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()
+             ));
+             Consumer<byte[], byte[]> consumer2 = 
clusterInstance.consumer(Map.of(
+                 ConsumerConfig.GROUP_ID_CONFIG, "group0",
+                 ConsumerConfig.CLIENT_RACK_CONFIG, "rack2",
+                 ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()
+             ))
+        ) {
+            // Create a new topic with 1 partition on broker 0.
+            admin.createTopics(List.of(new NewTopic(topic, Map.of(0, 
List.of(0)))));
+            clusterInstance.waitForTopic(topic, 1);
+
+            producer.send(new ProducerRecord<>(topic, "key".getBytes(), 
"value".getBytes()));
+            producer.flush();
+
+            consumer0.subscribe(List.of(topic));
+            consumer1.subscribe(List.of(topic));
+            consumer2.subscribe(List.of(topic));
+
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 0))) &&
+                    consumer1.assignment().isEmpty() &&
+                    consumer2.assignment().isEmpty();
+            }, "Consumer 0 should be assigned to topic partition 0");
+
+            // Add a new partition 1 and 2 to broker 1.
+            admin.createPartitions(
+                Map.of(
+                    topic,
+                    NewPartitions.increaseTo(3, List.of(List.of(1), 
List.of(1)))
+                )
+            );
+            clusterInstance.waitForTopic(topic, 3);
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 0))) &&
+                    consumer1.assignment().equals(Set.of(new 
TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
+                    consumer2.assignment().isEmpty();
+            }, "Consumer 1 should be assigned to topic partition 1 and 2");
+
+            // Add a new partition 3, 4, and 5 to broker 2.
+            admin.createPartitions(
+                Map.of(
+                    topic,
+                    NewPartitions.increaseTo(6, List.of(List.of(2), 
List.of(2), List.of(2)))
+                )
+            );
+            clusterInstance.waitForTopic(topic, 6);
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 0))) &&
+                    consumer1.assignment().equals(Set.of(new 
TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
+                    consumer2.assignment().equals(Set.of(new 
TopicPartition(topic, 3), new TopicPartition(topic, 4), new 
TopicPartition(topic, 5)));
+            }, "Consumer 2 should be assigned to topic partition 3, 4, and 5");
+
+            // Change partitions to different brokers.
+            // partition 0 -> broker 2
+            // partition 1 -> broker 2
+            // partition 2 -> broker 2
+            // partition 3 -> broker 1
+            // partition 4 -> broker 1
+            // partition 5 -> broker 0
+            admin.alterPartitionReassignments(Map.of(
+                new TopicPartition(topic, 0), Optional.of(new 
NewPartitionReassignment(List.of(2))),
+                new TopicPartition(topic, 1), Optional.of(new 
NewPartitionReassignment(List.of(2))),
+                new TopicPartition(topic, 2), Optional.of(new 
NewPartitionReassignment(List.of(2))),
+                new TopicPartition(topic, 3), Optional.of(new 
NewPartitionReassignment(List.of(1))),
+                new TopicPartition(topic, 4), Optional.of(new 
NewPartitionReassignment(List.of(1))),
+                new TopicPartition(topic, 5), Optional.of(new 
NewPartitionReassignment(List.of(0)))
+            )).all().get();
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 5))) &&
+                    consumer1.assignment().equals(Set.of(new 
TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
+                    consumer2.assignment().equals(Set.of(new 
TopicPartition(topic, 0), new TopicPartition(topic, 1), new 
TopicPartition(topic, 2)));
+            }, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 
3, 4 | 2 -> 0, 1, 2");
+        }
+    }
+
     private void sendMsg(ClusterInstance clusterInstance, String topic, int 
sendMsgNum) {
         try (var producer = clusterInstance.producer(Map.of(
                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class,
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
new file mode 100644
index 00000000000..e71e1f8a1a3
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
+import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
+import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
+import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The RackAwareAssignor is a consumer group partition assignor that takes 
into account the rack
+ * information of the members when assigning partitions to them.
+ * It needs all brokers and members to have rack information available.
+ */
+public class RackAwareAssignor implements ConsumerGroupPartitionAssignor {
+    @Override
+    public String name() {
+        return "rack-aware-assignor";
+    }
+
+    @Override
+    public GroupAssignment assign(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) throws 
PartitionAssignorException {
+        Map<String, String> rackIdToMemberId = new HashMap<>();
+        List<String> memberIds = new ArrayList<>(groupSpec.memberIds());
+        for (String memberId : memberIds) {
+            if (groupSpec.memberSubscription(memberId).rackId().isEmpty()) {
+                throw new PartitionAssignorException("Member " + memberId + " 
does not have rack information available.");
+            }
+            rackIdToMemberId.put(
+                groupSpec.memberSubscription(memberId).rackId().get(),
+                memberId
+            );
+        }
+
+        Map<String, Map<Uuid, Set<Integer>>> assignments = new HashMap<>();
+        for (Uuid topicId : 
groupSpec.memberSubscription(memberIds.get(0)).subscribedTopicIds()) {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException("Member is subscribed to 
a non-existent topic");
+            }
+
+            for (int partitionId = 0; partitionId < numPartitions; 
partitionId++) {
+                Set<String> racks = 
subscribedTopicDescriber.racksForPartition(topicId, partitionId);
+                if (racks.isEmpty()) {
+                    throw new PartitionAssignorException("No racks available 
for partition " + partitionId + " of topic " + topicId);
+                }
+
+                String assignedRack = null;
+                for (String rack : racks) {
+                    String memberId = rackIdToMemberId.get(rack);
+                    if (memberId == null) {
+                        continue;
+                    }
+                    assignedRack = rack;
+                    break;
+                }
+
+                if (assignedRack == null) {
+                    throw new PartitionAssignorException("No member found for 
racks " + racks + " for partition " + partitionId + " of topic " + topicId);
+                }
+
+                Map<Uuid, Set<Integer>> assignment = 
assignments.computeIfAbsent(
+                    rackIdToMemberId.get(assignedRack),
+                    k -> new HashMap<>()
+                );
+                Set<Integer> partitions = assignment.computeIfAbsent(
+                    topicId,
+                    k -> new java.util.HashSet<>()
+                );
+                partitions.add(partitionId);
+            }
+        }
+
+        Map<String, MemberAssignment> memberAssignments = new HashMap<>();
+        for (Map.Entry<String, Map<Uuid, Set<Integer>>> entry : 
assignments.entrySet()) {
+            memberAssignments.put(entry.getKey(), new 
MemberAssignmentImpl(entry.getValue()));
+        }
+        return new GroupAssignment(memberAssignments);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
index 4aff7c8c0a8..fce243ebc64 100644
--- a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
@@ -43,6 +43,7 @@ public class GroupRebalanceConfig {
     public final int heartbeatIntervalMs;
     public final String groupId;
     public final Optional<String> groupInstanceId;
+    public final Optional<String> rackId;
     public final long retryBackoffMs;
     public final long retryBackoffMaxMs;
     public final boolean leaveGroupOnClose;
@@ -53,8 +54,12 @@ public class GroupRebalanceConfig {
         // Consumer and Connect use different config names for defining 
rebalance timeout
         if ((protocolType == ProtocolType.CONSUMER) || (protocolType == 
ProtocolType.SHARE)) {
             this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+            String rackId = 
config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG);
+            this.rackId = rackId == null || rackId.isEmpty() ? 
Optional.empty() : Optional.of(rackId);
         } else {
             this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG);
+            this.rackId = Optional.empty();
         }
 
         this.heartbeatIntervalMs = 
config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -90,6 +95,7 @@ public class GroupRebalanceConfig {
                                 final int heartbeatIntervalMs,
                                 String groupId,
                                 Optional<String> groupInstanceId,
+                                String rackId,
                                 long retryBackoffMs,
                                 long retryBackoffMaxMs,
                                 boolean leaveGroupOnClose) {
@@ -98,6 +104,7 @@ public class GroupRebalanceConfig {
         this.heartbeatIntervalMs = heartbeatIntervalMs;
         this.groupId = groupId;
         this.groupInstanceId = groupInstanceId;
+        this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
         this.retryBackoffMs = retryBackoffMs;
         this.retryBackoffMaxMs = retryBackoffMaxMs;
         this.leaveGroupOnClose = leaveGroupOnClose;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index e17ae8ae972..3a50ff037ab 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -230,7 +230,6 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                         
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                         this.interceptors,
                         
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
-                        config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
                         clientTelemetryReporter);
             }
             this.fetcher = new Fetcher<>(
@@ -330,6 +329,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 heartbeatIntervalMs,
                 groupId.get(),
                 groupInstanceId,
+                rackId,
                 retryBackoffMs,
                 retryBackoffMaxMs,
                 true
@@ -348,7 +348,6 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 autoCommitIntervalMs,
                 interceptors,
                 throwOnStableOffsetNotSupported,
-                rackId,
                 clientTelemetryReporter
             );
         } else {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 499d7363f6b..4956d64228d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -178,7 +178,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                int autoCommitIntervalMs,
                                ConsumerInterceptors<?, ?> interceptors,
                                boolean throwOnFetchStableOffsetsUnsupported,
-                               String rackId,
                                Optional<ClientTelemetryReporter> 
clientTelemetryReporter) {
         this(rebalanceConfig,
             logContext,
@@ -193,7 +192,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             autoCommitIntervalMs,
             interceptors,
             throwOnFetchStableOffsetsUnsupported,
-            rackId,
             clientTelemetryReporter,
             Optional.empty());
     }
@@ -214,7 +212,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                int autoCommitIntervalMs,
                                ConsumerInterceptors<?, ?> interceptors,
                                boolean throwOnFetchStableOffsetsUnsupported,
-                               String rackId,
                                Optional<ClientTelemetryReporter> 
clientTelemetryReporter,
                                Optional<Supplier<BaseHeartbeatThread>> 
heartbeatThreadSupplier) {
         super(rebalanceConfig,
@@ -228,7 +225,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.rebalanceConfig = rebalanceConfig;
         this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
-        this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
+        this.rackId = rebalanceConfig.rackId;
         this.metadataSnapshot = new MetadataSnapshot(this.rackId, 
subscriptions, metadata.fetch(), metadata.updateVersion());
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index 2845f4bc9ee..c5f95305a47 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -247,6 +247,7 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
             sentFields.reset();
         }
 
+        @SuppressWarnings("NPathComplexity")
         public ConsumerGroupHeartbeatRequestData buildRequestData() {
             ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData();
 
@@ -306,6 +307,12 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
                 sentFields.localAssignment = local;
             }
 
+            // RackId - sent when joining
+            String rackId = membershipManager.rackId().orElse(null);
+            if (sendAllFields) {
+                data.setRackId(rackId);
+            }
+
             return data;
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index bf0f593d8ed..b6159770754 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -112,6 +112,8 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
      */
     protected final Optional<String> groupInstanceId;
 
+    private final Optional<String> rackId;
+
     /**
      * Rebalance timeout. To be used as time limit for the commit request 
issued
      * when a new assignment is received, that is retried until it succeeds, 
fails with a
@@ -140,6 +142,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
 
     public ConsumerMembershipManager(String groupId,
                                      Optional<String> groupInstanceId,
+                                     Optional<String> rackId,
                                      int rebalanceTimeoutMs,
                                      Optional<String> serverAssignor,
                                      SubscriptionState subscriptions,
@@ -152,6 +155,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
                                      boolean autoCommitEnabled) {
         this(groupId,
             groupInstanceId,
+            rackId,
             rebalanceTimeoutMs,
             serverAssignor,
             subscriptions,
@@ -167,6 +171,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
     // Visible for testing
     ConsumerMembershipManager(String groupId,
                               Optional<String> groupInstanceId,
+                              Optional<String> rackId,
                               int rebalanceTimeoutMs,
                               Optional<String> serverAssignor,
                               SubscriptionState subscriptions,
@@ -185,6 +190,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
             metricsManager,
             autoCommitEnabled);
         this.groupInstanceId = groupInstanceId;
+        this.rackId = rackId;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.serverAssignor = serverAssignor;
         this.commitRequestManager = commitRequestManager;
@@ -199,6 +205,10 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
         return groupInstanceId;
     }
 
+    public Optional<String> rackId() {
+        return rackId;
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index f341dc35a4a..32c76f8c732 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -248,6 +248,7 @@ public class RequestManagers implements Closeable {
                         membershipManager = new ConsumerMembershipManager(
                             groupRebalanceConfig.groupId,
                             groupRebalanceConfig.groupInstanceId,
+                            groupRebalanceConfig.rackId,
                             groupRebalanceConfig.rebalanceTimeoutMs,
                             serverAssignor,
                             subscriptions,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java 
b/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java
new file mode 100644
index 00000000000..bd9bc225205
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GroupRebalanceConfigTest {
+
+    @ParameterizedTest
+    @EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names = 
{"CONSUMER", "SHARE"})
+    void testRackIdIsEmptyIfNoDefined(GroupRebalanceConfig.ProtocolType 
protocolType) {
+        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+            new ConsumerConfig(Map.of(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer",
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer"
+            )),
+            protocolType
+        );
+        assertTrue(groupRebalanceConfig.rackId.isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names = 
{"CONSUMER", "SHARE"})
+    void testRackIdIsNotEmptyIfDefined(GroupRebalanceConfig.ProtocolType 
protocolType) {
+        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+            new ConsumerConfig(Map.of(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer",
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer",
+                ConsumerConfig.CLIENT_RACK_CONFIG, "rack1"
+            )),
+            protocolType
+        );
+        assertTrue(groupRebalanceConfig.rackId.isPresent());
+        assertEquals("rack1", groupRebalanceConfig.rackId.get());
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index df784da7937..6aa3095fada 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -166,6 +166,7 @@ public class AbstractCoordinatorTest {
                                                                         
HEARTBEAT_INTERVAL_MS,
                                                                         
GROUP_ID,
                                                                         
groupInstanceId,
+                                                                        null,
                                                                         
retryBackoffMs,
                                                                         
retryBackoffMaxMs,
                                                                         
leaveOnClose);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8d364b2ae30..683a25a3e1c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -208,7 +208,7 @@ public abstract class ConsumerCoordinatorTest {
         this.rebalanceListener = new MockRebalanceListener();
         this.mockOffsetCommitCallback = new MockCommitCallback();
         this.partitionAssignor.clear();
-        this.rebalanceConfig = buildRebalanceConfig(Optional.empty());
+        this.rebalanceConfig = buildRebalanceConfig(Optional.empty(), null);
         this.coordinator = buildCoordinator(rebalanceConfig,
                                             metrics,
                                             assignors,
@@ -216,12 +216,13 @@ public abstract class ConsumerCoordinatorTest {
                                             subscriptions);
     }
 
-    private GroupRebalanceConfig buildRebalanceConfig(Optional<String> 
groupInstanceId) {
+    private GroupRebalanceConfig buildRebalanceConfig(Optional<String> 
groupInstanceId, String rackId) {
         return new GroupRebalanceConfig(sessionTimeoutMs,
                                         rebalanceTimeoutMs,
                                         heartbeatIntervalMs,
                                         groupId,
                                         groupInstanceId,
+                                        rackId,
                                         retryBackoffMs,
                                         retryBackoffMaxMs,
                                         groupInstanceId.isEmpty());
@@ -2974,7 +2975,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
-        rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+        rebalanceConfig = buildRebalanceConfig(groupInstanceId, null);
         ConsumerCoordinator coordinator = buildCoordinator(
             rebalanceConfig,
             new Metrics(),
@@ -3699,7 +3700,6 @@ public abstract class ConsumerCoordinatorTest {
             autoCommitIntervalMs,
             null,
             true,
-            null,
             Optional.empty());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -3750,7 +3750,7 @@ public abstract class ConsumerCoordinatorTest {
                                                                final boolean 
autoCommit,
                                                                final 
Optional<String> groupInstanceId,
                                                                final boolean 
shouldPoll) {
-        rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+        rebalanceConfig = buildRebalanceConfig(groupInstanceId, null);
         ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
                                                            new Metrics(),
                                                            assignors,
@@ -3868,7 +3868,6 @@ public abstract class ConsumerCoordinatorTest {
                 autoCommitIntervalMs,
                 null,
                 false,
-                null,
                 Optional.empty());
     }
 
@@ -4112,9 +4111,10 @@ public abstract class ConsumerCoordinatorTest {
 
         metrics = new Metrics(time);
 
+        rebalanceConfig = 
buildRebalanceConfig(rebalanceConfig.groupInstanceId, rackId);
         coordinator = new ConsumerCoordinator(rebalanceConfig, new 
LogContext(), consumerClient,
                 Collections.singletonList(assignor), metadata, subscriptions,
-                metrics, consumerId + groupId, time, false, 
autoCommitIntervalMs, null, false, rackId, Optional.empty());
+                metrics, consumerId + groupId, time, false, 
autoCommitIntervalMs, null, false, Optional.empty());
     }
 
     private static MetadataResponse rackAwareMetadata(int numNodes,
@@ -4193,7 +4193,6 @@ public abstract class ConsumerCoordinatorTest {
             autoCommitIntervalMs,
             null,
             false,
-            null,
             Optional.empty(),
             Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class)));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 2bdc8819aec..9063ae5ab5b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -1009,6 +1009,34 @@ public class ConsumerHeartbeatRequestManagerTest {
         assertNull(data.subscribedTopicRegex());
     }
 
+    @Test
+    public void testRackIdInHeartbeatLifecycle() {
+        heartbeatState = new HeartbeatState(subscriptions, membershipManager, 
DEFAULT_MAX_POLL_INTERVAL_MS);
+        createHeartbeatRequestStateWithZeroHeartbeatInterval();
+
+        // Initial heartbeat with rackId
+        mockJoiningMemberData(null);
+        when(membershipManager.rackId()).thenReturn(Optional.of("rack1"));
+        ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
+        assertEquals("rack1", data.rackId());
+
+        // RackId not included in HB if member state is not JOINING
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
+        data = heartbeatState.buildRequestData();
+        assertNull(data.rackId());
+
+        // RackId included in HB if member state changes to JOINING again
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+        data = heartbeatState.buildRequestData();
+        assertEquals("rack1", data.rackId());
+
+        // Empty rackId not included in HB
+        when(membershipManager.rackId()).thenReturn(Optional.empty());
+        heartbeatState = new HeartbeatState(subscriptions, membershipManager, 
DEFAULT_MAX_POLL_INTERVAL_MS);
+        data = heartbeatState.buildRequestData();
+        assertNull(data.rackId());
+    }
+
     private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int 
nextPollMs) {
         NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index b8557e37015..aa8c7bdb1dc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -142,17 +142,20 @@ public class ConsumerMembershipManagerTest {
 
     private ConsumerMembershipManager createMembershipManager(String 
groupInstanceId) {
         ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
-            GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, 
Optional.empty(),
+            GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.empty(), 
REBALANCE_TIMEOUT, Optional.empty(),
             subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
             backgroundEventHandler, time, rebalanceMetricsManager, true));
         assertMemberIdIsGenerated(manager.memberId());
         return manager;
     }
 
-    private ConsumerMembershipManager 
createMembershipManagerJoiningGroup(String groupInstanceId,
-                                                                      String 
serverAssignor) {
+    private ConsumerMembershipManager createMembershipManagerJoiningGroup(
+        String groupInstanceId,
+        String serverAssignor,
+        String rackId
+    ) {
         ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
-                GROUP_ID, Optional.ofNullable(groupInstanceId), 
REBALANCE_TIMEOUT,
+                GROUP_ID, Optional.ofNullable(groupInstanceId), 
Optional.ofNullable(rackId), REBALANCE_TIMEOUT,
                 Optional.ofNullable(serverAssignor), subscriptionState, 
commitRequestManager,
                 metadata, LOG_CONTEXT, backgroundEventHandler, time, 
rebalanceMetricsManager, true));
         assertMemberIdIsGenerated(manager.memberId());
@@ -165,10 +168,19 @@ public class ConsumerMembershipManagerTest {
         ConsumerMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
         assertEquals(Optional.empty(), membershipManager.serverAssignor());
 
-        membershipManager = createMembershipManagerJoiningGroup("instance1", 
"Uniform");
+        membershipManager = createMembershipManagerJoiningGroup("instance1", 
"Uniform", null);
         assertEquals(Optional.of("Uniform"), 
membershipManager.serverAssignor());
     }
 
+    @Test
+    public void testMembershipManagerRackId() {
+        ConsumerMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
+        assertEquals(Optional.empty(), membershipManager.rackId());
+
+        membershipManager = createMembershipManagerJoiningGroup(null, null, 
"rack1");
+        assertEquals(Optional.of("rack1"), membershipManager.rackId());
+    }
+
     @Test
     public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
         createMembershipManagerJoiningGroup();
@@ -231,7 +243,7 @@ public class ConsumerMembershipManagerTest {
     @Test
     public void testTransitionToFailedWhenTryingToJoin() {
         ConsumerMembershipManager membershipManager = new 
ConsumerMembershipManager(
-                GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
+                GROUP_ID, Optional.empty(), Optional.empty(), 
REBALANCE_TIMEOUT, Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
             backgroundEventHandler, time, rebalanceMetricsManager, true);
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@@ -2737,7 +2749,7 @@ public class ConsumerMembershipManagerTest {
     }
 
     private ConsumerMembershipManager createMemberInStableState(String 
groupInstanceId) {
-        ConsumerMembershipManager membershipManager = 
createMembershipManagerJoiningGroup(groupInstanceId, null);
+        ConsumerMembershipManager membershipManager = 
createMembershipManagerJoiningGroup(groupInstanceId, null, null);
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(new Assignment(), 
membershipManager.memberId());
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index 49102da9766..de7937673c8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -45,6 +45,7 @@ public class HeartbeatTest {
                                                                         
heartbeatIntervalMs,
                                                                         
"group_id",
                                                                         
Optional.empty(),
+                                                                        null,
                                                                         
retryBackoffMs,
                                                                         
retryBackoffMaxMs,
                                                                         true);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index a8e7cd46552..b0408b12735 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -149,6 +149,7 @@ public class WorkerCoordinatorIncrementalTest {
             heartbeatIntervalMs,
             groupId,
             Optional.empty(),
+            null,
             retryBackoffMs,
             retryBackoffMaxMs,
             true);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 4122578266a..9b6d5915225 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -139,6 +139,7 @@ public class WorkerCoordinatorTest {
                                                         heartbeatIntervalMs,
                                                         groupId,
                                                         Optional.empty(),
+                                                        null,
                                                         retryBackoffMs,
                                                         retryBackoffMaxMs,
                                                         true);


Reply via email to