This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 949617b0b2e KAFKA-17747: [7/N] Add consumer group integration test for
rack aware assignment (#19856)
949617b0b2e is described below
commit 949617b0b2e237d713542ec76d52b733224bdea1
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Jun 5 01:32:17 2025 +0800
KAFKA-17747: [7/N] Add consumer group integration test for rack aware
assignment (#19856)
* Add `RackAwareAssignor`. It uses `racksForPartition` to check the rack
id of a partition and assign it to a member which has the same rack id.
* Add `ConsumerIntegrationTest#testRackAwareAssignment` to check
`racksForPartition` works correctly.
Reviewers: David Jacot <[email protected]>
---------
Signed-off-by: PoAn Yang <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../clients/consumer/ConsumerIntegrationTest.java | 119 +++++++++++++++++++++
.../kafka/clients/consumer/RackAwareAssignor.java | 104 ++++++++++++++++++
.../apache/kafka/clients/GroupRebalanceConfig.java | 7 ++
.../consumer/internals/ClassicKafkaConsumer.java | 3 +-
.../consumer/internals/ConsumerCoordinator.java | 5 +-
.../internals/ConsumerHeartbeatRequestManager.java | 7 ++
.../internals/ConsumerMembershipManager.java | 10 ++
.../consumer/internals/RequestManagers.java | 1 +
.../kafka/clients/GroupRebalanceConfigTest.java | 58 ++++++++++
.../internals/AbstractCoordinatorTest.java | 1 +
.../internals/ConsumerCoordinatorTest.java | 15 ++-
.../ConsumerHeartbeatRequestManagerTest.java | 28 +++++
.../internals/ConsumerMembershipManagerTest.java | 26 +++--
.../clients/consumer/internals/HeartbeatTest.java | 1 +
.../WorkerCoordinatorIncrementalTest.java | 1 +
.../runtime/distributed/WorkerCoordinatorTest.java | 1 +
18 files changed, 368 insertions(+), 21 deletions(-)
diff --git a/build.gradle b/build.gradle
index 968eda48e88..a3f1965f81a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1995,6 +1995,7 @@ project(':clients:clients-integration-tests') {
implementation project(':server-common')
testImplementation project(':metadata')
implementation project(':group-coordinator')
+ implementation project(':group-coordinator:group-coordinator-api')
implementation project(':transaction-coordinator')
testImplementation libs.junitJupiter
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9fc1bcd7eff..b1ef62ca3a2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -235,6 +235,7 @@
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
+ <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
<allow class="org.apache.logging.log4j.Level" />
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index a56de229318..8a41c1a8beb 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -16,7 +16,12 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
import
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
@@ -31,12 +36,16 @@ import
org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -216,6 +225,116 @@ public class ConsumerIntegrationTest {
}
}
+ @ClusterTest(
+ types = {Type.KRAFT},
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(id = 0, key = "broker.rack", value =
"rack0"),
+ @ClusterConfigProperty(id = 1, key = "broker.rack", value =
"rack1"),
+ @ClusterConfigProperty(id = 2, key = "broker.rack", value =
"rack2"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value =
"org.apache.kafka.clients.consumer.RackAwareAssignor")
+ }
+ )
+ public void testRackAwareAssignment(ClusterInstance clusterInstance)
throws ExecutionException, InterruptedException {
+ String topic = "test-topic";
+ try (Admin admin = clusterInstance.admin();
+ Producer<byte[], byte[]> producer = clusterInstance.producer();
+ Consumer<byte[], byte[]> consumer0 =
clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, "group0",
+ ConsumerConfig.CLIENT_RACK_CONFIG, "rack0",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name()
+ ));
+ Consumer<byte[], byte[]> consumer1 =
clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, "group0",
+ ConsumerConfig.CLIENT_RACK_CONFIG, "rack1",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name()
+ ));
+ Consumer<byte[], byte[]> consumer2 =
clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, "group0",
+ ConsumerConfig.CLIENT_RACK_CONFIG, "rack2",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name()
+ ))
+ ) {
+ // Create a new topic with 1 partition on broker 0.
+ admin.createTopics(List.of(new NewTopic(topic, Map.of(0,
List.of(0)))));
+ clusterInstance.waitForTopic(topic, 1);
+
+ producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes()));
+ producer.flush();
+
+ consumer0.subscribe(List.of(topic));
+ consumer1.subscribe(List.of(topic));
+ consumer2.subscribe(List.of(topic));
+
+ TestUtils.waitForCondition(() -> {
+ consumer0.poll(Duration.ofMillis(1000));
+ consumer1.poll(Duration.ofMillis(1000));
+ consumer2.poll(Duration.ofMillis(1000));
+ return consumer0.assignment().equals(Set.of(new
TopicPartition(topic, 0))) &&
+ consumer1.assignment().isEmpty() &&
+ consumer2.assignment().isEmpty();
+ }, "Consumer 0 should be assigned to topic partition 0");
+
+ // Add a new partition 1 and 2 to broker 1.
+ admin.createPartitions(
+ Map.of(
+ topic,
+ NewPartitions.increaseTo(3, List.of(List.of(1),
List.of(1)))
+ )
+ );
+ clusterInstance.waitForTopic(topic, 3);
+ TestUtils.waitForCondition(() -> {
+ consumer0.poll(Duration.ofMillis(1000));
+ consumer1.poll(Duration.ofMillis(1000));
+ consumer2.poll(Duration.ofMillis(1000));
+ return consumer0.assignment().equals(Set.of(new
TopicPartition(topic, 0))) &&
+ consumer1.assignment().equals(Set.of(new
TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
+ consumer2.assignment().isEmpty();
+ }, "Consumer 1 should be assigned to topic partition 1 and 2");
+
+ // Add a new partition 3, 4, and 5 to broker 2.
+ admin.createPartitions(
+ Map.of(
+ topic,
+ NewPartitions.increaseTo(6, List.of(List.of(2),
List.of(2), List.of(2)))
+ )
+ );
+ clusterInstance.waitForTopic(topic, 6);
+ TestUtils.waitForCondition(() -> {
+ consumer0.poll(Duration.ofMillis(1000));
+ consumer1.poll(Duration.ofMillis(1000));
+ consumer2.poll(Duration.ofMillis(1000));
+ return consumer0.assignment().equals(Set.of(new
TopicPartition(topic, 0))) &&
+ consumer1.assignment().equals(Set.of(new
TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
+ consumer2.assignment().equals(Set.of(new
TopicPartition(topic, 3), new TopicPartition(topic, 4), new
TopicPartition(topic, 5)));
+ }, "Consumer 2 should be assigned to topic partition 3, 4, and 5");
+
+ // Change partitions to different brokers.
+ // partition 0 -> broker 2
+ // partition 1 -> broker 2
+ // partition 2 -> broker 2
+ // partition 3 -> broker 1
+ // partition 4 -> broker 1
+ // partition 5 -> broker 0
+ admin.alterPartitionReassignments(Map.of(
+ new TopicPartition(topic, 0), Optional.of(new
NewPartitionReassignment(List.of(2))),
+ new TopicPartition(topic, 1), Optional.of(new
NewPartitionReassignment(List.of(2))),
+ new TopicPartition(topic, 2), Optional.of(new
NewPartitionReassignment(List.of(2))),
+ new TopicPartition(topic, 3), Optional.of(new
NewPartitionReassignment(List.of(1))),
+ new TopicPartition(topic, 4), Optional.of(new
NewPartitionReassignment(List.of(1))),
+ new TopicPartition(topic, 5), Optional.of(new
NewPartitionReassignment(List.of(0)))
+ )).all().get();
+ TestUtils.waitForCondition(() -> {
+ consumer0.poll(Duration.ofMillis(1000));
+ consumer1.poll(Duration.ofMillis(1000));
+ consumer2.poll(Duration.ofMillis(1000));
+ return consumer0.assignment().equals(Set.of(new
TopicPartition(topic, 5))) &&
+ consumer1.assignment().equals(Set.of(new
TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
+ consumer2.assignment().equals(Set.of(new
TopicPartition(topic, 0), new TopicPartition(topic, 1), new
TopicPartition(topic, 2)));
+ }, "Consumer with topic partition mapping should be 0 -> 5 | 1 ->
3, 4 | 2 -> 0, 1, 2");
+ }
+ }
+
private void sendMsg(ClusterInstance clusterInstance, String topic, int
sendMsgNum) {
try (var producer = clusterInstance.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
new file mode 100644
index 00000000000..e71e1f8a1a3
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.Uuid;
+import
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
+import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
+import
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
+import
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The RackAwareAssignor is a consumer group partition assignor that takes
into account the rack
+ * information of the members when assigning partitions to them.
+ * It needs all brokers and members to have rack information available.
+ */
+public class RackAwareAssignor implements ConsumerGroupPartitionAssignor {
+ @Override
+ public String name() {
+ return "rack-aware-assignor";
+ }
+
+ @Override
+ public GroupAssignment assign(GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber) throws
PartitionAssignorException {
+ Map<String, String> rackIdToMemberId = new HashMap<>();
+ List<String> memberIds = new ArrayList<>(groupSpec.memberIds());
+ for (String memberId : memberIds) {
+ if (groupSpec.memberSubscription(memberId).rackId().isEmpty()) {
+ throw new PartitionAssignorException("Member " + memberId + "
does not have rack information available.");
+ }
+ rackIdToMemberId.put(
+ groupSpec.memberSubscription(memberId).rackId().get(),
+ memberId
+ );
+ }
+
+ Map<String, Map<Uuid, Set<Integer>>> assignments = new HashMap<>();
+ for (Uuid topicId :
groupSpec.memberSubscription(memberIds.get(0)).subscribedTopicIds()) {
+ int numPartitions =
subscribedTopicDescriber.numPartitions(topicId);
+ if (numPartitions == -1) {
+ throw new PartitionAssignorException("Member is subscribed to
a non-existent topic");
+ }
+
+ for (int partitionId = 0; partitionId < numPartitions;
partitionId++) {
+ Set<String> racks =
subscribedTopicDescriber.racksForPartition(topicId, partitionId);
+ if (racks.isEmpty()) {
+ throw new PartitionAssignorException("No racks available
for partition " + partitionId + " of topic " + topicId);
+ }
+
+ String assignedRack = null;
+ for (String rack : racks) {
+ String memberId = rackIdToMemberId.get(rack);
+ if (memberId == null) {
+ continue;
+ }
+ assignedRack = rack;
+ break;
+ }
+
+ if (assignedRack == null) {
+ throw new PartitionAssignorException("No member found for
racks " + racks + " for partition " + partitionId + " of topic " + topicId);
+ }
+
+ Map<Uuid, Set<Integer>> assignment =
assignments.computeIfAbsent(
+ rackIdToMemberId.get(assignedRack),
+ k -> new HashMap<>()
+ );
+ Set<Integer> partitions = assignment.computeIfAbsent(
+ topicId,
+ k -> new java.util.HashSet<>()
+ );
+ partitions.add(partitionId);
+ }
+ }
+
+ Map<String, MemberAssignment> memberAssignments = new HashMap<>();
+ for (Map.Entry<String, Map<Uuid, Set<Integer>>> entry :
assignments.entrySet()) {
+ memberAssignments.put(entry.getKey(), new
MemberAssignmentImpl(entry.getValue()));
+ }
+ return new GroupAssignment(memberAssignments);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
index 4aff7c8c0a8..fce243ebc64 100644
--- a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
@@ -43,6 +43,7 @@ public class GroupRebalanceConfig {
public final int heartbeatIntervalMs;
public final String groupId;
public final Optional<String> groupInstanceId;
+ public final Optional<String> rackId;
public final long retryBackoffMs;
public final long retryBackoffMaxMs;
public final boolean leaveGroupOnClose;
@@ -53,8 +54,12 @@ public class GroupRebalanceConfig {
// Consumer and Connect use different config names for defining
rebalance timeout
if ((protocolType == ProtocolType.CONSUMER) || (protocolType ==
ProtocolType.SHARE)) {
this.rebalanceTimeoutMs =
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+ String rackId =
config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG);
+ this.rackId = rackId == null || rackId.isEmpty() ?
Optional.empty() : Optional.of(rackId);
} else {
this.rebalanceTimeoutMs =
config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG);
+ this.rackId = Optional.empty();
}
this.heartbeatIntervalMs =
config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -90,6 +95,7 @@ public class GroupRebalanceConfig {
final int heartbeatIntervalMs,
String groupId,
Optional<String> groupInstanceId,
+ String rackId,
long retryBackoffMs,
long retryBackoffMaxMs,
boolean leaveGroupOnClose) {
@@ -98,6 +104,7 @@ public class GroupRebalanceConfig {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.groupId = groupId;
this.groupInstanceId = groupInstanceId;
+ this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() :
Optional.of(rackId);
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
this.leaveGroupOnClose = leaveGroupOnClose;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index e17ae8ae972..3a50ff037ab 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -230,7 +230,6 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
- config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
clientTelemetryReporter);
}
this.fetcher = new Fetcher<>(
@@ -330,6 +329,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
heartbeatIntervalMs,
groupId.get(),
groupInstanceId,
+ rackId,
retryBackoffMs,
retryBackoffMaxMs,
true
@@ -348,7 +348,6 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
autoCommitIntervalMs,
interceptors,
throwOnStableOffsetNotSupported,
- rackId,
clientTelemetryReporter
);
} else {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 499d7363f6b..4956d64228d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -178,7 +178,6 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
- String rackId,
Optional<ClientTelemetryReporter>
clientTelemetryReporter) {
this(rebalanceConfig,
logContext,
@@ -193,7 +192,6 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
autoCommitIntervalMs,
interceptors,
throwOnFetchStableOffsetsUnsupported,
- rackId,
clientTelemetryReporter,
Optional.empty());
}
@@ -214,7 +212,6 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
- String rackId,
Optional<ClientTelemetryReporter>
clientTelemetryReporter,
Optional<Supplier<BaseHeartbeatThread>>
heartbeatThreadSupplier) {
super(rebalanceConfig,
@@ -228,7 +225,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
- this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() :
Optional.of(rackId);
+ this.rackId = rebalanceConfig.rackId;
this.metadataSnapshot = new MetadataSnapshot(this.rackId,
subscriptions, metadata.fetch(), metadata.updateVersion());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index 2845f4bc9ee..c5f95305a47 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -247,6 +247,7 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
sentFields.reset();
}
+ @SuppressWarnings("NPathComplexity")
public ConsumerGroupHeartbeatRequestData buildRequestData() {
ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData();
@@ -306,6 +307,12 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
sentFields.localAssignment = local;
}
+ // RackId - sent when joining
+ String rackId = membershipManager.rackId().orElse(null);
+ if (sendAllFields) {
+ data.setRackId(rackId);
+ }
+
return data;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index bf0f593d8ed..b6159770754 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -112,6 +112,8 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
*/
protected final Optional<String> groupInstanceId;
+ private final Optional<String> rackId;
+
/**
* Rebalance timeout. To be used as time limit for the commit request
issued
* when a new assignment is received, that is retried until it succeeds,
fails with a
@@ -140,6 +142,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
public ConsumerMembershipManager(String groupId,
Optional<String> groupInstanceId,
+ Optional<String> rackId,
int rebalanceTimeoutMs,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
@@ -152,6 +155,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
boolean autoCommitEnabled) {
this(groupId,
groupInstanceId,
+ rackId,
rebalanceTimeoutMs,
serverAssignor,
subscriptions,
@@ -167,6 +171,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
// Visible for testing
ConsumerMembershipManager(String groupId,
Optional<String> groupInstanceId,
+ Optional<String> rackId,
int rebalanceTimeoutMs,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
@@ -185,6 +190,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
metricsManager,
autoCommitEnabled);
this.groupInstanceId = groupInstanceId;
+ this.rackId = rackId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.serverAssignor = serverAssignor;
this.commitRequestManager = commitRequestManager;
@@ -199,6 +205,10 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
return groupInstanceId;
}
+ public Optional<String> rackId() {
+ return rackId;
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index f341dc35a4a..32c76f8c732 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -248,6 +248,7 @@ public class RequestManagers implements Closeable {
membershipManager = new ConsumerMembershipManager(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
+ groupRebalanceConfig.rackId,
groupRebalanceConfig.rebalanceTimeoutMs,
serverAssignor,
subscriptions,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java
new file mode 100644
index 00000000000..bd9bc225205
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GroupRebalanceConfigTest {
+
+ @ParameterizedTest
+ @EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names =
{"CONSUMER", "SHARE"})
+ void testRackIdIsEmptyIfNoDefined(GroupRebalanceConfig.ProtocolType
protocolType) {
+ GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+ new ConsumerConfig(Map.of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer"
+ )),
+ protocolType
+ );
+ assertTrue(groupRebalanceConfig.rackId.isEmpty());
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names =
{"CONSUMER", "SHARE"})
+ void testRackIdIsNotEmptyIfDefined(GroupRebalanceConfig.ProtocolType
protocolType) {
+ GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+ new ConsumerConfig(Map.of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.CLIENT_RACK_CONFIG, "rack1"
+ )),
+ protocolType
+ );
+ assertTrue(groupRebalanceConfig.rackId.isPresent());
+ assertEquals("rack1", groupRebalanceConfig.rackId.get());
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index df784da7937..6aa3095fada 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -166,6 +166,7 @@ public class AbstractCoordinatorTest {
HEARTBEAT_INTERVAL_MS,
GROUP_ID,
groupInstanceId,
+ null,
retryBackoffMs,
retryBackoffMaxMs,
leaveOnClose);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8d364b2ae30..683a25a3e1c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -208,7 +208,7 @@ public abstract class ConsumerCoordinatorTest {
this.rebalanceListener = new MockRebalanceListener();
this.mockOffsetCommitCallback = new MockCommitCallback();
this.partitionAssignor.clear();
- this.rebalanceConfig = buildRebalanceConfig(Optional.empty());
+ this.rebalanceConfig = buildRebalanceConfig(Optional.empty(), null);
this.coordinator = buildCoordinator(rebalanceConfig,
metrics,
assignors,
@@ -216,12 +216,13 @@ public abstract class ConsumerCoordinatorTest {
subscriptions);
}
- private GroupRebalanceConfig buildRebalanceConfig(Optional<String>
groupInstanceId) {
+ private GroupRebalanceConfig buildRebalanceConfig(Optional<String>
groupInstanceId, String rackId) {
return new GroupRebalanceConfig(sessionTimeoutMs,
rebalanceTimeoutMs,
heartbeatIntervalMs,
groupId,
groupInstanceId,
+ rackId,
retryBackoffMs,
retryBackoffMaxMs,
groupInstanceId.isEmpty());
@@ -2974,7 +2975,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
- rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+ rebalanceConfig = buildRebalanceConfig(groupInstanceId, null);
ConsumerCoordinator coordinator = buildCoordinator(
rebalanceConfig,
new Metrics(),
@@ -3699,7 +3700,6 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
true,
- null,
Optional.empty());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -3750,7 +3750,7 @@ public abstract class ConsumerCoordinatorTest {
final boolean
autoCommit,
final
Optional<String> groupInstanceId,
final boolean
shouldPoll) {
- rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+ rebalanceConfig = buildRebalanceConfig(groupInstanceId, null);
ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
new Metrics(),
assignors,
@@ -3868,7 +3868,6 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
false,
- null,
Optional.empty());
}
@@ -4112,9 +4111,10 @@ public abstract class ConsumerCoordinatorTest {
metrics = new Metrics(time);
+ rebalanceConfig =
buildRebalanceConfig(rebalanceConfig.groupInstanceId, rackId);
coordinator = new ConsumerCoordinator(rebalanceConfig, new
LogContext(), consumerClient,
Collections.singletonList(assignor), metadata, subscriptions,
- metrics, consumerId + groupId, time, false,
autoCommitIntervalMs, null, false, rackId, Optional.empty());
+ metrics, consumerId + groupId, time, false,
autoCommitIntervalMs, null, false, Optional.empty());
}
private static MetadataResponse rackAwareMetadata(int numNodes,
@@ -4193,7 +4193,6 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
false,
- null,
Optional.empty(),
Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class)));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 2bdc8819aec..9063ae5ab5b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -1009,6 +1009,34 @@ public class ConsumerHeartbeatRequestManagerTest {
assertNull(data.subscribedTopicRegex());
}
+ @Test
+ public void testRackIdInHeartbeatLifecycle() {
+ heartbeatState = new HeartbeatState(subscriptions, membershipManager,
DEFAULT_MAX_POLL_INTERVAL_MS);
+ createHeartbeatRequestStateWithZeroHeartbeatInterval();
+
+ // Initial heartbeat with rackId
+ mockJoiningMemberData(null);
+ when(membershipManager.rackId()).thenReturn(Optional.of("rack1"));
+ ConsumerGroupHeartbeatRequestData data =
heartbeatState.buildRequestData();
+ assertEquals("rack1", data.rackId());
+
+ // RackId not included in HB if member state is not JOINING
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
+ data = heartbeatState.buildRequestData();
+ assertNull(data.rackId());
+
+ // RackId included in HB if member state changes to JOINING again
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+ data = heartbeatState.buildRequestData();
+ assertEquals("rack1", data.rackId());
+
+ // Empty rackId not included in HB
+ when(membershipManager.rackId()).thenReturn(Optional.empty());
+ heartbeatState = new HeartbeatState(subscriptions, membershipManager,
DEFAULT_MAX_POLL_INTERVAL_MS);
+ data = heartbeatState.buildRequestData();
+ assertNull(data.rackId());
+ }
+
private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int
nextPollMs) {
NetworkClientDelegate.PollResult pollResult =
hrm.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index b8557e37015..aa8c7bdb1dc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -142,17 +142,20 @@ public class ConsumerMembershipManagerTest {
private ConsumerMembershipManager createMembershipManager(String
groupInstanceId) {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
- GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.empty(),
+ GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.empty(),
REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
return manager;
}
- private ConsumerMembershipManager
createMembershipManagerJoiningGroup(String groupInstanceId,
- String
serverAssignor) {
+ private ConsumerMembershipManager createMembershipManagerJoiningGroup(
+ String groupInstanceId,
+ String serverAssignor,
+ String rackId
+ ) {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
- GROUP_ID, Optional.ofNullable(groupInstanceId),
REBALANCE_TIMEOUT,
+ GROUP_ID, Optional.ofNullable(groupInstanceId),
Optional.ofNullable(rackId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState,
commitRequestManager,
metadata, LOG_CONTEXT, backgroundEventHandler, time,
rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
@@ -165,10 +168,19 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
assertEquals(Optional.empty(), membershipManager.serverAssignor());
- membershipManager = createMembershipManagerJoiningGroup("instance1",
"Uniform");
+ membershipManager = createMembershipManagerJoiningGroup("instance1",
"Uniform", null);
assertEquals(Optional.of("Uniform"),
membershipManager.serverAssignor());
}
+ @Test
+ public void testMembershipManagerRackId() {
+ ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
+ assertEquals(Optional.empty(), membershipManager.rackId());
+
+ membershipManager = createMembershipManagerJoiningGroup(null, null,
"rack1");
+ assertEquals(Optional.of("rack1"), membershipManager.rackId());
+ }
+
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
createMembershipManagerJoiningGroup();
@@ -231,7 +243,7 @@ public class ConsumerMembershipManagerTest {
@Test
public void testTransitionToFailedWhenTryingToJoin() {
ConsumerMembershipManager membershipManager = new
ConsumerMembershipManager(
- GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT,
Optional.empty(),
+ GROUP_ID, Optional.empty(), Optional.empty(),
REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager, true);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@@ -2737,7 +2749,7 @@ public class ConsumerMembershipManagerTest {
}
private ConsumerMembershipManager createMemberInStableState(String
groupInstanceId) {
- ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup(groupInstanceId, null);
+ ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup(groupInstanceId, null, null);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(new Assignment(),
membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index 49102da9766..de7937673c8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -45,6 +45,7 @@ public class HeartbeatTest {
heartbeatIntervalMs,
"group_id",
Optional.empty(),
+ null,
retryBackoffMs,
retryBackoffMaxMs,
true);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index a8e7cd46552..b0408b12735 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -149,6 +149,7 @@ public class WorkerCoordinatorIncrementalTest {
heartbeatIntervalMs,
groupId,
Optional.empty(),
+ null,
retryBackoffMs,
retryBackoffMaxMs,
true);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 4122578266a..9b6d5915225 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -139,6 +139,7 @@ public class WorkerCoordinatorTest {
heartbeatIntervalMs,
groupId,
Optional.empty(),
+ null,
retryBackoffMs,
retryBackoffMaxMs,
true);