This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d23ce20bdfb KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954) d23ce20bdfb is described below commit d23ce20bdfbe5a9598523961cb7cf747ce4f52ef Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Wed Dec 7 11:41:21 2022 +0000 KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954) Reviewers: David Jacot <dja...@confluent.io> --- checkstyle/suppressions.xml | 2 +- .../consumer/ConsumerPartitionAssignor.java | 15 +++-- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/ConsumerCoordinator.java | 8 ++- .../consumer/internals/ConsumerProtocol.java | 5 +- .../common/message/ConsumerProtocolAssignment.json | 3 +- .../message/ConsumerProtocolSubscription.json | 6 +- .../consumer/CooperativeStickyAssignorTest.java | 8 +-- .../kafka/clients/consumer/KafkaConsumerTest.java | 3 +- .../kafka/clients/consumer/StickyAssignorTest.java | 8 +-- .../internals/ConsumerCoordinatorTest.java | 67 +++++++++++++++++++--- .../consumer/internals/ConsumerProtocolTest.java | 23 ++++++-- .../kafka/api/PlaintextConsumerTest.scala | 17 ++++++ 13 files changed, 136 insertions(+), 32 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ac0accc17fa..cea6a193790 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -66,7 +66,7 @@ <suppress checks="ParameterNumber" files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/> <suppress checks="ParameterNumber" - files="KafkaConsumer.java"/> + files="(KafkaConsumer|ConsumerCoordinator).java"/> <suppress checks="ParameterNumber" files="Fetcher.java"/> <suppress checks="ParameterNumber" diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index f544ce77e6b..0488c2b8c8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -103,27 +103,29 @@ public interface ConsumerPartitionAssignor { private final List<String> topics; private final ByteBuffer userData; private final List<TopicPartition> ownedPartitions; + private final Optional<String> rackId; private Optional<String> groupInstanceId; private final Optional<Integer> generationId; - public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions, int generationId) { + public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions, int generationId, Optional<String> rackId) { this.topics = topics; this.userData = userData; this.ownedPartitions = ownedPartitions; this.groupInstanceId = Optional.empty(); this.generationId = generationId < 0 ? Optional.empty() : Optional.of(generationId); + this.rackId = rackId; } public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) { - this(topics, userData, ownedPartitions, DEFAULT_GENERATION); + this(topics, userData, ownedPartitions, DEFAULT_GENERATION, Optional.empty()); } public Subscription(List<String> topics, ByteBuffer userData) { - this(topics, userData, Collections.emptyList(), DEFAULT_GENERATION); + this(topics, userData, Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } public Subscription(List<String> topics) { - this(topics, null, Collections.emptyList(), DEFAULT_GENERATION); + this(topics, null, Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } public List<String> topics() { @@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor { return ownedPartitions; } + public Optional<String> rackId() { + return rackId; + } + public void setGroupInstanceId(Optional<String> groupInstanceId) { this.groupInstanceId = groupInstanceId; } @@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor { ", ownedPartitions=" + ownedPartitions + ", groupInstanceId=" + groupInstanceId.map(String::toString).orElse("null") + ", generationId=" + generationId.orElse(-1) + + ", rackId=" + (rackId.orElse("null")) + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f07846945a7..cf85798f82b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -790,7 +790,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, - config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), + config.getString(ConsumerConfig.CLIENT_RACK_CONFIG)); } this.fetcher = new Fetcher<>( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 051962d4435..fec31fe80f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -117,6 +117,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private AtomicBoolean asyncCommitFenced; private ConsumerGroupMetadata groupMetadata; private final boolean throwOnFetchStableOffsetsUnsupported; + private final Optional<String> rackId; // hold onto request&future for committed offset requests to enable async calls. private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; @@ -162,7 +163,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, - boolean throwOnFetchStableOffsetsUnsupported) { + boolean throwOnFetchStableOffsetsUnsupported, + String rackId) { super(rebalanceConfig, logContext, client, @@ -186,6 +188,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, JoinGroupRequest.UNKNOWN_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId); this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; + this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); if (autoCommitEnabled) this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs); @@ -245,7 +248,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Subscription subscription = new Subscription(topics, assignor.subscriptionUserData(joinedSubscription), subscriptions.assignedPartitionsList(), - generation().generationId); + generation().generationId, + rackId); ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol() diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index b237a3b0c21..79feb4eb57b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; /** * ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with @@ -89,6 +90,7 @@ public class ConsumerProtocol { } partition.partitions().add(tp.partition()); } + subscription.rackId().ifPresent(data::setRackId); data.setGenerationId(subscription.generationId().orElse(-1)); return MessageUtil.toVersionPrefixedByteBuffer(version, data); @@ -112,7 +114,8 @@ public class ConsumerProtocol { data.topics(), data.userData() != null ? data.userData().duplicate() : null, ownedPartitions, - data.generationId()); + data.generationId(), + data.rackId() == null || data.rackId().isEmpty() ? Optional.empty() : Optional.of(data.rackId())); } catch (BufferUnderflowException e) { throw new SchemaException("Buffer underflow while parsing consumer protocol's subscription", e); } diff --git a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json index 8b0c138d698..fe07aaeadff 100644 --- a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json +++ b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json @@ -23,7 +23,8 @@ // that new versions cannot remove or reorder any of the existing fields. // // Version 2 is to support a new field "GenerationId" in ConsumerProtocolSubscription. - "validVersions": "0-2", + // Version 3 adds rack id to ConsumerProtocolSubscription. + "validVersions": "0-3", "flexibleVersions": "none", "fields": [ { "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+", diff --git a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json index 6997d56a04b..49801c65f77 100644 --- a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json +++ b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json @@ -24,7 +24,8 @@ // Version 1 added the "OwnedPartitions" field to allow assigner know what partitions each member owned // Version 2 added a new field "GenerationId" to indicate if the member has out-of-date ownedPartitions. - "validVersions": "0-2", + // Version 3 adds rack id to enable rack-aware assignment. + "validVersions": "0-3", "flexibleVersions": "none", "fields": [ { "name": "Topics", "type": "[]string", "versions": "0+" }, @@ -36,6 +37,7 @@ { "name": "Partitions", "type": "[]int32", "versions": "1+"} ] }, - { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1", "ignorable": true } + { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1", "ignorable": true }, + { "name": "RackId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "ignorable": true } ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index 9284de50ab7..28a988b8d82 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -54,12 +54,12 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { @Override public Subscription buildSubscriptionV1(List<String> topics, List<TopicPartition> partitions, int generationId) { assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(partitions), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); - return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, DEFAULT_GENERATION); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, DEFAULT_GENERATION, Optional.empty()); } @Override public Subscription buildSubscriptionV2Above(List<String> topics, List<TopicPartition> partitions, int generationId) { - return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, generationId); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, generationId, Optional.empty()); } @Override @@ -156,7 +156,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { // subscription containing empty owned partitions and the same generation id, and non-empty owned partition in user data, // member data should honor the one in subscription since cooperativeStickyAssignor only supports ConsumerProtocolSubscription v1 and above - Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId), Collections.emptyList(), generationId); + Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId), Collections.emptyList(), generationId, Optional.empty()); AbstractStickyAssignor.MemberData memberData = memberData(subscription); assertEquals(Collections.emptyList(), memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); @@ -170,7 +170,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { // subscription containing empty owned partitions and a higher generation id, and non-empty owned partition in user data, // member data should honor the one in subscription since generation id is higher - Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId - 1), Collections.emptyList(), generationId); + Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId - 1), Collections.emptyList(), generationId, Optional.empty()); AbstractStickyAssignor.MemberData memberData = memberData(subscription); assertEquals(Collections.emptyList(), memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2c667dcb5d6..f08ac45ddaf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2614,7 +2614,8 @@ public class KafkaConsumerTest { autoCommitEnabled, autoCommitIntervalMs, interceptors, - throwOnStableOffsetNotSupported); + throwOnStableOffsetNotSupported, + null); } Fetcher<String, String> fetcher = new Fetcher<>( loggerFactory, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index b27aa0e4a74..9dbc085f76c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -53,19 +53,19 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest { @Override public Subscription buildSubscriptionV0(List<String> topics, List<TopicPartition> partitions, int generationId) { return new Subscription(topics, serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generationId))), - Collections.emptyList(), DEFAULT_GENERATION); + Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } @Override public Subscription buildSubscriptionV1(List<String> topics, List<TopicPartition> partitions, int generationId) { return new Subscription(topics, serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generationId))), - partitions, DEFAULT_GENERATION); + partitions, DEFAULT_GENERATION, Optional.empty()); } @Override public Subscription buildSubscriptionV2Above(List<String> topics, List<TopicPartition> partitions, int generationId) { return new Subscription(topics, serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generationId))), - partitions, generationId); + partitions, generationId, Optional.empty()); } @Override @@ -308,7 +308,7 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest { List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), tp(topic2, 1)); int generationIdInUserData = generationId - 1; - Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationIdInUserData), Collections.emptyList(), generationId); + Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationIdInUserData), Collections.emptyList(), generationId, Optional.empty()); AbstractStickyAssignor.MemberData memberData = memberData(subscription); // in StickyAssignor with eager rebalance protocol, we'll always honor data in user data assertEquals(ownedPartitions, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 9dd44740a55..36bf0ea6825 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -117,6 +117,7 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER; import static org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; @@ -1847,7 +1848,7 @@ public abstract class ConsumerCoordinatorTest { // note that `MockPartitionAssignor.prepare` is not called therefore calling `MockPartitionAssignor.assign` // will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped. Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, true, Errors.NONE)); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, true, Errors.NONE, Optional.empty())); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); @@ -1879,7 +1880,7 @@ public abstract class ConsumerCoordinatorTest { mkEntry(consumerId, singletonList(topic1)), mkEntry(consumerId2, singletonList(topic2)) ); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, true, Errors.NONE)); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, true, Errors.NONE, Optional.empty())); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); @@ -3490,6 +3491,36 @@ public abstract class ConsumerCoordinatorTest { assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost); } + @Test + public void testSubscriptionRackId() { + metrics.close(); + coordinator.close(time.timer(0)); + + String rackId = "rack-a"; + metrics = new Metrics(time); + RackAwareAssignor assignor = new RackAwareAssignor(); + + coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, + Collections.singletonList(assignor), metadata, subscriptions, + metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId); + + subscriptions.subscribe(singleton(topic1), rebalanceListener); + client.updateMetadata(metadataResponse); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); + assignor.prepare(singletonMap(consumerId, singletonList(t1p))); + + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, false, Errors.NONE, Optional.of(rackId))); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); + + coordinator.poll(time.timer(Long.MAX_VALUE)); + assertEquals(singleton(t1p), coordinator.subscriptionState().assignedPartitions()); + assertEquals(singleton(rackId), assignor.rackIds); + } + @Test public void testThrowOnUnsupportedStableFlag() { supportStableFlag((short) 6, true); @@ -3514,7 +3545,8 @@ public abstract class ConsumerCoordinatorTest { false, autoCommitIntervalMs, null, - true); + true, + null); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, upperVersion)); @@ -3675,7 +3707,8 @@ public abstract class ConsumerCoordinatorTest { autoCommitEnabled, autoCommitIntervalMs, null, - false); + false, + null); } private Collection<TopicPartition> getRevoked(final List<TopicPartition> owned, @@ -3731,7 +3764,7 @@ public abstract class ConsumerCoordinatorTest { Map<String, List<String>> subscriptions, Errors error ) { - return joinGroupLeaderResponse(generationId, memberId, subscriptions, false, error); + return joinGroupLeaderResponse(generationId, memberId, subscriptions, false, error, Optional.empty()); } private JoinGroupResponse joinGroupLeaderResponse( @@ -3739,11 +3772,13 @@ public abstract class ConsumerCoordinatorTest { String memberId, Map<String, List<String>> subscriptions, boolean skipAssignment, - Errors error + Errors error, + Optional<String> rackId ) { List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>(); for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) { - ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue()); + ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue(), + null, Collections.emptyList(), DEFAULT_GENERATION, rackId); ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription); metadata.add(new JoinGroupResponseData.JoinGroupResponseMember() .setMemberId(subscriptionEntry.getKey()) @@ -3901,4 +3936,22 @@ public abstract class ConsumerCoordinatorTest { this.exception = exception; } } + + private static class RackAwareAssignor extends MockPartitionAssignor { + private final Set<String> rackIds = new HashSet<>(); + + RackAwareAssignor() { + super(Arrays.asList(RebalanceProtocol.EAGER)); + } + + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { + subscriptions.forEach((consumer, subscription) -> { + if (!subscription.rackId().isPresent()) + throw new IllegalStateException("Rack id not provided in subscription for " + consumer); + rackIds.add(subscription.rackId().get()); + }); + return super.assign(partitionsPerTopic, subscriptions); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 5106432ee53..0e97bfbb318 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -49,6 +49,7 @@ public class ConsumerProtocolTest { private final TopicPartition tp2 = new TopicPartition("bar", 2); private final Optional<String> groupInstanceId = Optional.of("instance.id"); private final int generationId = 1; + private final Optional<String> rackId = Optional.of("rack-a"); @Test public void serializeDeserializeSubscriptionAllVersions() { @@ -56,7 +57,7 @@ public class ConsumerProtocolTest { new TopicPartition("foo", 0), new TopicPartition("bar", 0)); Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), - ByteBuffer.wrap("hello".getBytes()), ownedPartitions, generationId); + ByteBuffer.wrap("hello".getBytes()), ownedPartitions, generationId, rackId); for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription, version); @@ -77,6 +78,12 @@ public class ConsumerProtocolTest { } else { assertFalse(parsedSubscription.generationId().isPresent()); } + + if (version >= 3) { + assertEquals(rackId, parsedSubscription.rackId()); + } else { + assertEquals(Optional.empty(), parsedSubscription.rackId()); + } } } @@ -89,6 +96,7 @@ public class ConsumerProtocolTest { assertEquals(0, parsedSubscription.userData().limit()); assertFalse(parsedSubscription.groupInstanceId().isPresent()); assertFalse(parsedSubscription.generationId().isPresent()); + assertFalse(parsedSubscription.rackId().isPresent()); } @Test @@ -102,6 +110,7 @@ public class ConsumerProtocolTest { assertEquals(0, parsedSubscription.userData().limit()); assertEquals(groupInstanceId, parsedSubscription.groupInstanceId()); assertFalse(parsedSubscription.generationId().isPresent()); + assertFalse(parsedSubscription.rackId().isPresent()); } @Test @@ -111,6 +120,7 @@ public class ConsumerProtocolTest { Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); assertEquals(toSet(subscription.topics()), toSet(parsedSubscription.topics())); assertNull(parsedSubscription.userData()); + assertFalse(parsedSubscription.rackId().isPresent()); } @Test @@ -146,14 +156,15 @@ public class ConsumerProtocolTest { assertNull(parsedSubscription.userData()); assertTrue(parsedSubscription.ownedPartitions().isEmpty()); assertFalse(parsedSubscription.generationId().isPresent()); + assertFalse(parsedSubscription.rackId().isPresent()); } @ParameterizedTest @ValueSource(booleans = {true, false}) - public void deserializeNewSubscriptionWithOldVersion(boolean hasGenerationId) { + public void deserializeNewSubscriptionWithOldVersion(boolean hasGenerationIdAndRack) { Subscription subscription; - if (hasGenerationId) { - subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2), generationId); + if (hasGenerationIdAndRack) { + subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2), generationId, rackId); } else { subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2)); } @@ -166,6 +177,7 @@ public class ConsumerProtocolTest { assertTrue(parsedSubscription.ownedPartitions().isEmpty()); assertFalse(parsedSubscription.groupInstanceId().isPresent()); assertFalse(parsedSubscription.generationId().isPresent()); + assertFalse(parsedSubscription.rackId().isPresent()); } @Test @@ -178,6 +190,7 @@ public class ConsumerProtocolTest { assertEquals(Collections.singleton(tp2), toSet(subscription.ownedPartitions())); assertEquals(groupInstanceId, subscription.groupInstanceId()); assertEquals(generationId, subscription.generationId().orElse(DEFAULT_GENERATION)); + assertEquals(rackId, subscription.rackId()); } @Test @@ -253,6 +266,7 @@ public class ConsumerProtocolTest { new Field("owned_partitions", new ArrayOf( ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)), new Field("generation_id", Type.INT32), + new Field("rack_id", Type.STRING), new Field("bar", Type.STRING)); Struct subscriptionV100 = new Struct(subscriptionSchemaV100); @@ -263,6 +277,7 @@ public class ConsumerProtocolTest { .set("topic", tp2.topic()) .set("partitions", new Object[]{tp2.partition()})}); subscriptionV100.set("generation_id", generationId); + subscriptionV100.set("rack_id", rackId.orElse(null)); subscriptionV100.set("bar", "bar"); Struct headerV100 = new Struct(new Schema(new Field("version", Type.INT16))); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 92670dba3b3..bfc280a1678 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1902,5 +1902,22 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @Test + def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { + consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") + consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) + val consumer = createConsumer() + consumer.subscribe(Set(topic).asJava) + awaitAssignment(consumer, Set(tp, tp2)) + } +} + +class RackAwareAssignor extends RoundRobinAssignor { + override def assign(partitionsPerTopic: util.Map[String, Integer], subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): util.Map[String, util.List[TopicPartition]] = { + assertEquals(1, subscriptions.size()) + assertEquals(Optional.of("rack-a"), subscriptions.values.asScala.head.rackId) + super.assign(partitionsPerTopic, subscriptions) + } }