dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115822582
##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
private Map<String, List<MemberInfo>> consumersPerTopic(Map<String,
Subscription> consumerMetadata) {
Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
- for (Map.Entry<String, Subscription> subscriptionEntry :
consumerMetadata.entrySet()) {
- String consumerId = subscriptionEntry.getKey();
- MemberInfo memberInfo = new MemberInfo(consumerId,
subscriptionEntry.getValue().groupInstanceId());
- for (String topic : subscriptionEntry.getValue().topics()) {
- put(topicToConsumers, topic, memberInfo);
- }
- }
+ consumerMetadata.forEach((consumerId, subscription) -> {
+ MemberInfo memberInfo = new MemberInfo(consumerId,
subscription.groupInstanceId(), subscription.rackId());
+ subscription.topics().forEach(topic -> put(topicToConsumers,
topic, memberInfo));
+ });
return topicToConsumers;
}
+ /**
+ * Performs range assignment of the specified partitions for the consumers
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result
in mis-aligned racks.
+ */
@Override
- public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
- Map<String, Subscription>
subscriptions) {
+ public Map<String, List<TopicPartition>> assignPartitions(Map<String,
List<PartitionInfo>> partitionsPerTopic,
+ Map<String,
Subscription> subscriptions) {
Map<String, List<MemberInfo>> consumersPerTopic =
consumersPerTopic(subscriptions);
+ Map<String, String> consumerRacks = consumerRacks(subscriptions);
+ List<TopicAssignmentState> topicAssignmentStates =
partitionsPerTopic.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(),
consumersPerTopic.get(e.getKey()), consumerRacks))
+ .collect(Collectors.toList());
Map<String, List<TopicPartition>> assignment = new HashMap<>();
- for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<>());
+ subscriptions.keySet().forEach(memberId -> assignment.put(memberId,
new ArrayList<>()));
+
+ boolean useRackAware = topicAssignmentStates.stream().anyMatch(t ->
t.needsRackAwareAssignment);
+ if (useRackAware)
+ assignWithRackMatching(topicAssignmentStates, assignment);
+
+ topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true,
assignment));
+
+ if (useRackAware)
+ assignment.values().forEach(list ->
list.sort(PARTITION_COMPARATOR));
+ return assignment;
+ }
+
+ // This method is not used, but retained for compatibility with any custom
assignors that extend this class.
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
+ Map<String, Subscription>
subscriptions) {
+ return
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+ }
- for (Map.Entry<String, List<MemberInfo>> topicEntry :
consumersPerTopic.entrySet()) {
- String topic = topicEntry.getKey();
- List<MemberInfo> consumersForTopic = topicEntry.getValue();
+ private void assignRanges(TopicAssignmentState assignmentState,
+ BiFunction<String, TopicPartition, Boolean>
mayAssign,
+ Map<String, List<TopicPartition>> assignment) {
+ for (String consumer : assignmentState.consumers.keySet()) {
+ if (assignmentState.unassignedPartitions.isEmpty())
+ break;
+ List<TopicPartition> assignablePartitions =
assignmentState.unassignedPartitions.stream()
+ .filter(tp -> mayAssign.apply(consumer, tp))
+ .collect(Collectors.toList());
- Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
- if (numPartitionsForTopic == null)
+ int maxAssignable =
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+ if (maxAssignable <= 0)
continue;
- Collections.sort(consumersForTopic);
Review Comment:
Thanks. I missed it in the `TopicAssignmentState`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]