dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121387449
##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
private Map<String, List<MemberInfo>> consumersPerTopic(Map<String,
Subscription> consumerMetadata) {
Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
- for (Map.Entry<String, Subscription> subscriptionEntry :
consumerMetadata.entrySet()) {
- String consumerId = subscriptionEntry.getKey();
- MemberInfo memberInfo = new MemberInfo(consumerId,
subscriptionEntry.getValue().groupInstanceId());
- for (String topic : subscriptionEntry.getValue().topics()) {
- put(topicToConsumers, topic, memberInfo);
- }
- }
+ consumerMetadata.forEach((consumerId, subscription) -> {
+ MemberInfo memberInfo = new MemberInfo(consumerId,
subscription.groupInstanceId(), subscription.rackId());
+ subscription.topics().forEach(topic -> put(topicToConsumers,
topic, memberInfo));
+ });
return topicToConsumers;
}
+ /**
+ * Performs range assignment of the specified partitions for the consumers
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result
in mis-aligned racks.
+ */
@Override
- public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
- Map<String, Subscription>
subscriptions) {
+ public Map<String, List<TopicPartition>> assignPartitions(Map<String,
List<PartitionInfo>> partitionsPerTopic,
+ Map<String,
Subscription> subscriptions) {
Map<String, List<MemberInfo>> consumersPerTopic =
consumersPerTopic(subscriptions);
+ Map<String, String> consumerRacks = consumerRacks(subscriptions);
+ List<TopicAssignmentState> topicAssignmentStates =
partitionsPerTopic.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(),
consumersPerTopic.get(e.getKey()), consumerRacks))
+ .collect(Collectors.toList());
Map<String, List<TopicPartition>> assignment = new HashMap<>();
- for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<>());
+ subscriptions.keySet().forEach(memberId -> assignment.put(memberId,
new ArrayList<>()));
+
+ boolean useRackAware = topicAssignmentStates.stream().anyMatch(t ->
t.needsRackAwareAssignment);
+ if (useRackAware)
+ assignWithRackMatching(topicAssignmentStates, assignment);
+
+ topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true,
assignment));
+
+ if (useRackAware)
+ assignment.values().forEach(list ->
list.sort(PARTITION_COMPARATOR));
+ return assignment;
+ }
+
+ // This method is not used, but retained for compatibility with any custom
assignors that extend this class.
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
+ Map<String, Subscription>
subscriptions) {
+ return
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+ }
- for (Map.Entry<String, List<MemberInfo>> topicEntry :
consumersPerTopic.entrySet()) {
- String topic = topicEntry.getKey();
- List<MemberInfo> consumersForTopic = topicEntry.getValue();
+ private void assignRanges(TopicAssignmentState assignmentState,
+ BiFunction<String, TopicPartition, Boolean>
mayAssign,
+ Map<String, List<TopicPartition>> assignment) {
+ for (String consumer : assignmentState.consumers.keySet()) {
+ if (assignmentState.unassignedPartitions.isEmpty())
+ break;
+ List<TopicPartition> assignablePartitions =
assignmentState.unassignedPartitions.stream()
+ .filter(tp -> mayAssign.apply(consumer, tp))
+ .collect(Collectors.toList());
- Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
- if (numPartitionsForTopic == null)
+ int maxAssignable =
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+ if (maxAssignable <= 0)
continue;
- Collections.sort(consumersForTopic);
+ assign(consumer, assignablePartitions.subList(0, maxAssignable),
assignmentState, assignment);
+ }
+ }
- int numPartitionsPerConsumer = numPartitionsForTopic /
consumersForTopic.size();
- int consumersWithExtraPartition = numPartitionsForTopic %
consumersForTopic.size();
+ private void assignWithRackMatching(Collection<TopicAssignmentState>
assignmentStates,
+ Map<String, List<TopicPartition>>
assignment) {
- List<TopicPartition> partitions =
AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
- for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
- int start = numPartitionsPerConsumer * i + Math.min(i,
consumersWithExtraPartition);
- int length = numPartitionsPerConsumer + (i + 1 >
consumersWithExtraPartition ? 0 : 1);
-
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start,
start + length));
+ assignmentStates.stream().collect(Collectors.groupingBy(t ->
t.consumers)).forEach((consumers, states) -> {
+ states.stream().collect(Collectors.groupingBy(t ->
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+ if (coPartitionedStates.size() > 1)
+ assignCoPartitionedWithRackMatching(consumers,
numPartitions, states, assignment);
+ else {
+ TopicAssignmentState state = coPartitionedStates.get(0);
+ if (state.needsRackAwareAssignment)
+ assignRanges(state, state::racksMatch, assignment);
+ }
+ });
+ });
+ }
+
+ private void assignCoPartitionedWithRackMatching(LinkedHashMap<String,
Optional<String>> consumers,
+ int numPartitions,
+
Collection<TopicAssignmentState> assignmentStates,
+ Map<String,
List<TopicPartition>> assignment) {
+
+ List<String> remainingConsumers = new LinkedList<>(consumers.keySet());
+ for (int i = 0; i < numPartitions; i++) {
+ int p = i;
+
+ Optional<String> matchingConsumer = remainingConsumers.stream()
+ .filter(c -> assignmentStates.stream().allMatch(t ->
t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+ .findFirst();
+ if (matchingConsumer.isPresent()) {
+ String consumer = matchingConsumer.get();
+ assignmentStates.forEach(t -> assign(consumer,
Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+ if (assignmentStates.stream().noneMatch(t ->
t.maxAssignable(consumer) > 0)) {
+ remainingConsumers.remove(consumer);
+ if (remainingConsumers.isEmpty())
+ break;
+ }
}
}
- return assignment;
+ }
+
+ private void assign(String consumer, List<TopicPartition> partitions,
TopicAssignmentState assignmentState, Map<String, List<TopicPartition>>
assignment) {
+ assignment.get(consumer).addAll(partitions);
+ assignmentState.onAssigned(consumer, partitions);
+ }
+
+ private Map<String, String> consumerRacks(Map<String, Subscription>
subscriptions) {
+ Map<String, String> consumerRacks = new
HashMap<>(subscriptions.size());
+ subscriptions.forEach((memberId, subscription) ->
+ subscription.rackId().filter(r ->
!r.isEmpty()).ifPresent(rackId -> consumerRacks.put(memberId, rackId)));
+ return consumerRacks;
+ }
+
+ private class TopicAssignmentState {
+ private final String topic;
+ private final LinkedHashMap<String, Optional<String>> consumers;
+ private final boolean needsRackAwareAssignment;
+ private final Map<TopicPartition, Set<String>> partitionRacks;
+
+ private final List<TopicPartition> unassignedPartitions;
+ private final Map<String, Integer> numAssignedByConsumer;
+ private final int numPartitionsPerConsumer;
+ private int remainingConsumersWithExtraPartition;
+
+ public TopicAssignmentState(String topic, List<PartitionInfo>
partitionInfos, List<MemberInfo> membersOrNull, Map<String, String>
consumerRacks) {
+ this.topic = topic;
+ List<MemberInfo> members = membersOrNull == null ?
Collections.emptyList() : membersOrNull;
+ Collections.sort(members);
+ consumers = members.stream().map(c -> c.memberId)
+ .collect(Collectors.toMap(Function.identity(), c ->
Optional.ofNullable(consumerRacks.get(c)), (a, b) -> a, LinkedHashMap::new));
+
+ this.unassignedPartitions = partitionInfos.stream().map(p -> new
TopicPartition(p.topic(), p.partition()))
+ .collect(Collectors.toCollection(LinkedList::new));
Review Comment:
nit: I was wondering if using a `LinkedHashSet` would be better here because
we have to remove partition from it in `onAssigned`. Did you consider this?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
private Map<String, List<MemberInfo>> consumersPerTopic(Map<String,
Subscription> consumerMetadata) {
Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
- for (Map.Entry<String, Subscription> subscriptionEntry :
consumerMetadata.entrySet()) {
- String consumerId = subscriptionEntry.getKey();
- MemberInfo memberInfo = new MemberInfo(consumerId,
subscriptionEntry.getValue().groupInstanceId());
- for (String topic : subscriptionEntry.getValue().topics()) {
- put(topicToConsumers, topic, memberInfo);
- }
- }
+ consumerMetadata.forEach((consumerId, subscription) -> {
+ MemberInfo memberInfo = new MemberInfo(consumerId,
subscription.groupInstanceId(), subscription.rackId());
+ subscription.topics().forEach(topic -> put(topicToConsumers,
topic, memberInfo));
+ });
return topicToConsumers;
}
+ /**
+ * Performs range assignment of the specified partitions for the consumers
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result
in mis-aligned racks.
+ */
@Override
- public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
- Map<String, Subscription>
subscriptions) {
+ public Map<String, List<TopicPartition>> assignPartitions(Map<String,
List<PartitionInfo>> partitionsPerTopic,
+ Map<String,
Subscription> subscriptions) {
Map<String, List<MemberInfo>> consumersPerTopic =
consumersPerTopic(subscriptions);
+ Map<String, String> consumerRacks = consumerRacks(subscriptions);
+ List<TopicAssignmentState> topicAssignmentStates =
partitionsPerTopic.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(),
consumersPerTopic.get(e.getKey()), consumerRacks))
+ .collect(Collectors.toList());
Map<String, List<TopicPartition>> assignment = new HashMap<>();
- for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<>());
+ subscriptions.keySet().forEach(memberId -> assignment.put(memberId,
new ArrayList<>()));
+
+ boolean useRackAware = topicAssignmentStates.stream().anyMatch(t ->
t.needsRackAwareAssignment);
+ if (useRackAware)
+ assignWithRackMatching(topicAssignmentStates, assignment);
+
+ topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true,
assignment));
+
+ if (useRackAware)
+ assignment.values().forEach(list ->
list.sort(PARTITION_COMPARATOR));
+ return assignment;
+ }
+
+ // This method is not used, but retained for compatibility with any custom
assignors that extend this class.
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
+ Map<String, Subscription>
subscriptions) {
+ return
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+ }
- for (Map.Entry<String, List<MemberInfo>> topicEntry :
consumersPerTopic.entrySet()) {
- String topic = topicEntry.getKey();
- List<MemberInfo> consumersForTopic = topicEntry.getValue();
+ private void assignRanges(TopicAssignmentState assignmentState,
+ BiFunction<String, TopicPartition, Boolean>
mayAssign,
+ Map<String, List<TopicPartition>> assignment) {
+ for (String consumer : assignmentState.consumers.keySet()) {
+ if (assignmentState.unassignedPartitions.isEmpty())
+ break;
+ List<TopicPartition> assignablePartitions =
assignmentState.unassignedPartitions.stream()
+ .filter(tp -> mayAssign.apply(consumer, tp))
+ .collect(Collectors.toList());
Review Comment:
nit: I was wondering if we should directly limit based on
`assignmentState.maxAssignable(consumer)` at this stage. For large groups with
a large number of partitions, we don't really have to generate the full list as
we limit it anyway right after.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
private Map<String, List<MemberInfo>> consumersPerTopic(Map<String,
Subscription> consumerMetadata) {
Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
- for (Map.Entry<String, Subscription> subscriptionEntry :
consumerMetadata.entrySet()) {
- String consumerId = subscriptionEntry.getKey();
- MemberInfo memberInfo = new MemberInfo(consumerId,
subscriptionEntry.getValue().groupInstanceId());
- for (String topic : subscriptionEntry.getValue().topics()) {
- put(topicToConsumers, topic, memberInfo);
- }
- }
+ consumerMetadata.forEach((consumerId, subscription) -> {
+ MemberInfo memberInfo = new MemberInfo(consumerId,
subscription.groupInstanceId(), subscription.rackId());
+ subscription.topics().forEach(topic -> put(topicToConsumers,
topic, memberInfo));
+ });
return topicToConsumers;
}
+ /**
+ * Performs range assignment of the specified partitions for the consumers
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result
in mis-aligned racks.
+ */
@Override
- public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
- Map<String, Subscription>
subscriptions) {
+ public Map<String, List<TopicPartition>> assignPartitions(Map<String,
List<PartitionInfo>> partitionsPerTopic,
+ Map<String,
Subscription> subscriptions) {
Map<String, List<MemberInfo>> consumersPerTopic =
consumersPerTopic(subscriptions);
+ Map<String, String> consumerRacks = consumerRacks(subscriptions);
+ List<TopicAssignmentState> topicAssignmentStates =
partitionsPerTopic.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(),
consumersPerTopic.get(e.getKey()), consumerRacks))
+ .collect(Collectors.toList());
Map<String, List<TopicPartition>> assignment = new HashMap<>();
- for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<>());
+ subscriptions.keySet().forEach(memberId -> assignment.put(memberId,
new ArrayList<>()));
+
+ boolean useRackAware = topicAssignmentStates.stream().anyMatch(t ->
t.needsRackAwareAssignment);
+ if (useRackAware)
+ assignWithRackMatching(topicAssignmentStates, assignment);
+
+ topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true,
assignment));
+
+ if (useRackAware)
+ assignment.values().forEach(list ->
list.sort(PARTITION_COMPARATOR));
+ return assignment;
+ }
+
+ // This method is not used, but retained for compatibility with any custom
assignors that extend this class.
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
+ Map<String, Subscription>
subscriptions) {
+ return
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+ }
- for (Map.Entry<String, List<MemberInfo>> topicEntry :
consumersPerTopic.entrySet()) {
- String topic = topicEntry.getKey();
- List<MemberInfo> consumersForTopic = topicEntry.getValue();
+ private void assignRanges(TopicAssignmentState assignmentState,
+ BiFunction<String, TopicPartition, Boolean>
mayAssign,
+ Map<String, List<TopicPartition>> assignment) {
+ for (String consumer : assignmentState.consumers.keySet()) {
+ if (assignmentState.unassignedPartitions.isEmpty())
+ break;
+ List<TopicPartition> assignablePartitions =
assignmentState.unassignedPartitions.stream()
+ .filter(tp -> mayAssign.apply(consumer, tp))
+ .collect(Collectors.toList());
- Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
- if (numPartitionsForTopic == null)
+ int maxAssignable =
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+ if (maxAssignable <= 0)
continue;
- Collections.sort(consumersForTopic);
+ assign(consumer, assignablePartitions.subList(0, maxAssignable),
assignmentState, assignment);
+ }
+ }
- int numPartitionsPerConsumer = numPartitionsForTopic /
consumersForTopic.size();
- int consumersWithExtraPartition = numPartitionsForTopic %
consumersForTopic.size();
+ private void assignWithRackMatching(Collection<TopicAssignmentState>
assignmentStates,
+ Map<String, List<TopicPartition>>
assignment) {
- List<TopicPartition> partitions =
AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
- for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
- int start = numPartitionsPerConsumer * i + Math.min(i,
consumersWithExtraPartition);
- int length = numPartitionsPerConsumer + (i + 1 >
consumersWithExtraPartition ? 0 : 1);
-
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start,
start + length));
+ assignmentStates.stream().collect(Collectors.groupingBy(t ->
t.consumers)).forEach((consumers, states) -> {
+ states.stream().collect(Collectors.groupingBy(t ->
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+ if (coPartitionedStates.size() > 1)
+ assignCoPartitionedWithRackMatching(consumers,
numPartitions, states, assignment);
+ else {
+ TopicAssignmentState state = coPartitionedStates.get(0);
+ if (state.needsRackAwareAssignment)
+ assignRanges(state, state::racksMatch, assignment);
+ }
+ });
+ });
+ }
+
+ private void assignCoPartitionedWithRackMatching(LinkedHashMap<String,
Optional<String>> consumers,
+ int numPartitions,
+
Collection<TopicAssignmentState> assignmentStates,
+ Map<String,
List<TopicPartition>> assignment) {
+
+ List<String> remainingConsumers = new LinkedList<>(consumers.keySet());
Review Comment:
nit: Should we also use a set here as we remove the selected consumer from
it?
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
@Test
- def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
- consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[RackAwareAssignor].getName)
- val consumer = createConsumer()
- consumer.subscribe(Set(topic).asJava)
- awaitAssignment(consumer, Set(tp, tp2))
- }
-}
+ def testRackAwareRangeAssignor(): Unit = {
Review Comment:
We have `FetchFromFollowerIntegrationTest`. Not sure if this what you were
looking for.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -63,9 +77,26 @@
* <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
* <li><code>I1: [t0p2, t1p2]</code>
* </ul>
+ * <p>
+ * Rack-aware assignment is used if both consumer and partition replica racks
are available and
+ * some partitions have replicas only on a subset of racks. We attempt to
match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced
assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers guarantee
co-partitioning by prioritizing
+ * co-partitioning over rack alignment. In this case, aligning partition
replicas of these topics on the
+ * same racks will improve locality for consumers. For example, if partitions
0 of all topics have a replica
+ * on rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be
assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.
+ * <p>
+ * Note that rack-aware assignment currently takes all replicas into account,
including any offline replicas
+ * and replicas that are not in the ISR. This is based on the assumption that
these replicas are likely
+ * to join the ISR relatively soon. Since consumers don't rebalance on ISR
change, this avoids unnecessary
+ * cross-rack traffic for long durations after replicas rejoin the ISR. In the
future, we may consider
+ * rebalancing when replicas are added or removed to improve consumer rack
alignment.
+ * </p>
*/
public class RangeAssignor extends AbstractPartitionAssignor {
public static final String RANGE_ASSIGNOR_NAME = "range";
+ private final static TopicPartitionComparator PARTITION_COMPARATOR = new
TopicPartitionComparator();
Review Comment:
nit: `final static` -> `static final` to be consistent with L98?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]