This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 278a93c45d9 KAFKA-18901: [1/N] Improved homogeneous SimpleAssignor
(#19142)
278a93c45d9 is described below
commit 278a93c45d9622ecb30a31506420213bf70e521d
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Mar 11 10:08:31 2025 +0000
KAFKA-18901: [1/N] Improved homogeneous SimpleAssignor (#19142)
The current homogeneous SimpleAssignor for share groups is not very good
at revoking partitions which have previously been assigned when the
number of members increases. This PR improves the situation.
It also fixes the sorting of assignments in `kafka-consumer-groups.sh`
and `kafka-share-groups.sh` so that it sorts partition indices
numerically instead of alphabetically. It also adds the missing number
of partitions column for share groups.
---
.../coordinator/group/assignor/SimpleAssignor.java | 233 ++++++++++++++-------
.../group/assignor/SimpleAssignorTest.java | 59 +++++-
.../tools/consumer/group/ConsumerGroupCommand.java | 2 +-
.../tools/consumer/group/ShareGroupCommand.java | 16 +-
.../consumer/group/ShareGroupCommandTest.java | 16 +-
5 files changed, 229 insertions(+), 97 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
index 28c89966936..16ada6c4668 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
@@ -34,13 +34,26 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
/**
- * A simple partition assignor that assigns partitions of the subscribed
topics based on the rules defined in KIP-932 to different members.
+ * A simple partition assignor for share groups that assigns partitions of the
subscribed topics
+ * based on the rules defined in KIP-932 to different members. It is not
rack-aware.
+ * <p>
+ * Assignments are done according to the following principles:
+ * <ol>
+ * <li>Balance: Ensure partitions are distributed equally among all
members.
+ * The difference in assignments sizes between any two
members
+ * should not exceed one partition.</li>
+ * <li>Stickiness: Minimize partition movements among members by
retaining
+ * as much of the existing assignment as possible.</li>
+ * </ol>
+ * <p>
+ * Balance is prioritized above stickiness.
*/
public class SimpleAssignor implements ShareGroupPartitionAssignor {
@@ -60,13 +73,13 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
return new GroupAssignment(Map.of());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
- return assignHomogenous(groupSpec, subscribedTopicDescriber);
+ return assignHomogeneous(groupSpec, subscribedTopicDescriber);
} else {
return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
}
}
- private GroupAssignment assignHomogenous(
+ private GroupAssignment assignHomogeneous(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
@@ -107,7 +120,7 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
/**
* Get the current assignment by topic partitions.
- * @param groupSpec - The group metadata specifications.
+ * @param groupSpec The group metadata specifications.
* @return the current assignment for subscribed topic partitions to
memberIds.
*/
private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec
groupSpec) {
@@ -123,10 +136,10 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
/**
* This function computes the new assignment for a homogeneous group.
- * @param groupSpec - The group metadata specifications.
- * @param subscribedTopicIds - The set of all the subscribed topic ids for
the group.
- * @param targetPartitions - The list of all topic partitions that need
assignment.
- * @param currentAssignment - The current assignment for subscribed topic
partitions to memberIds.
+ * @param groupSpec The group metadata specifications.
+ * @param subscribedTopicIds The set of all the subscribed topic ids for
the group.
+ * @param targetPartitions The list of all topic partitions that need
assignment.
+ * @param currentAssignment The current assignment for subscribed topic
partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHomogeneous(
@@ -135,49 +148,63 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
List<TopicIdPartition> targetPartitions,
Map<TopicIdPartition, List<String>> currentAssignment
) {
- Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
- // Step 1: Hash member IDs to topic partitions.
- memberHashAssignment(targetPartitions, groupSpec.memberIds(),
newAssignment);
-
- // Step 2: Round-robin assignment for unassigned partitions which do
not have members already assigned in the current assignment.
- List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
- .filter(targetPartition ->
!newAssignment.containsKey(targetPartition))
- .filter(targetPartition ->
!currentAssignment.containsKey(targetPartition))
- .toList();
-
- roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions,
newAssignment);
-
- // Step 3: We combine current assignment and new assignment.
- Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
- // As per the KIP, we should revoke the assignments from current
assignment for partitions that were assigned by step 1
- // in the new assignment and have members in current assignment by
step 2. But we haven't implemented it to avoid the
- // complexity in both the implementation and the run time complexity.
This step was mentioned in the KIP to reduce
- // the burden of certain members of the share groups. This can be
achieved with the help of limiting the max
- // no. of partitions assignment for every member(KAFKA-18788). Hence,
the potential problem of burdening
- // the share consumers will be addressed in a future PR.
+ // For entirely balanced assignment, we would expect
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+ // That can be expressed as Math.ceil(numTargetPartitions /
(double) numGroupMembers)
+ // Using integer arithmetic, as (numTargetPartitions +
numGroupMembers - 1) / numGroupMembers
+ int numGroupMembers = groupSpec.memberIds().size();
+ int numTargetPartitions = targetPartitions.size();
+ int desiredAssignmentCount = (numTargetPartitions + numGroupMembers -
1) / numGroupMembers;
+
+ Map<TopicIdPartition, List<String>> newAssignment =
newHashMap(numTargetPartitions);
+
+ // Hash member IDs to topic partitions. Each member will be assigned
one partition, but some partitions
+ // might have been assigned to more than one member.
+ memberHashAssignment(groupSpec.memberIds(), targetPartitions,
newAssignment);
+
+ // Combine current and new hashed assignments, sized to accommodate
the expected number of mappings.
+ Map<String, Set<TopicIdPartition>> finalAssignment =
newHashMap(numGroupMembers);
+ Map<TopicIdPartition, Set<String>> finalAssignmentByPartition =
newHashMap(numTargetPartitions);
+
+ // First, take the members assigned by hashing.
+ newAssignment.forEach((targetPartition, members) ->
members.forEach(member -> {
+ finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition);
+ finalAssignmentByPartition.computeIfAbsent(targetPartition, k ->
new HashSet<>()).add(member);
+ }));
- newAssignment.forEach((targetPartition, members) ->
members.forEach(member ->
- finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition)));
+ // Then, take the members from the current assignment, making sure
that no member has too many assigned partitions.
// When combining current assignment, we need to only consider the
topics in current assignment that are also being
// subscribed in the new assignment as well.
currentAssignment.forEach((targetPartition, members) -> {
- if (subscribedTopicIds.contains(targetPartition.topicId()))
+ if (subscribedTopicIds.contains(targetPartition.topicId())) {
members.forEach(member -> {
- if (groupSpec.memberIds().contains(member) &&
!newAssignment.containsKey(targetPartition))
- finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition);
+ if (groupSpec.memberIds().contains(member)) {
+ Set<TopicIdPartition> memberPartitions =
finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
+ if ((memberPartitions.size() < desiredAssignmentCount)
&& !newAssignment.containsKey(targetPartition)) {
+ memberPartitions.add(targetPartition);
+
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new
HashSet<>()).add(member);
+ }
+ }
});
+ }
});
+ // Finally, round-robin assignment for unassigned partitions which do
not already have members assigned.
+ // The order of steps differs slightly from KIP-932 because the
desired assignment count has been taken into
+ // account when copying partitions across from the current assignment,
and this is more convenient.
+ List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
+ .filter(targetPartition ->
!finalAssignmentByPartition.containsKey(targetPartition))
+ .toList();
+
+ roundRobinAssignmentWithCount(groupSpec.memberIds(),
unassignedPartitions, finalAssignment, desiredAssignmentCount);
+
return groupAssignment(finalAssignment, groupSpec.memberIds());
}
/**
* This function computes the new assignment for a heterogeneous group.
- * @param groupSpec - The group metadata specifications.
- * @param memberToPartitionsSubscription - The member to subscribed topic
partitions map.
- * @param currentAssignment - The current assignment for subscribed topic
partitions to memberIds.
+ * @param groupSpec The group metadata
specifications.
+ * @param memberToPartitionsSubscription The member to subscribed topic
partitions map.
+ * @param currentAssignment The current assignment for
subscribed topic partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHeterogeneous(
@@ -185,6 +212,7 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
Map<TopicIdPartition, List<String>> currentAssignment
) {
+ int numGroupMembers = groupSpec.memberIds().size();
// Exhaustive set of all subscribed topic partitions.
Set<TopicIdPartition> targetPartitions = new LinkedHashSet<>();
@@ -199,7 +227,7 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
// Step 1: Hash member IDs to partitions.
memberToPartitionsSubscription.forEach((member, partitions) ->
- memberHashAssignment(partitions, List.of(member), newAssignment));
+ memberHashAssignment(List.of(member), partitions, newAssignment));
// Step 2: Round-robin assignment for unassigned partitions which do
not have members already assigned in the current assignment.
Set<TopicIdPartition> assignedPartitions = new
LinkedHashSet<>(newAssignment.keySet());
@@ -213,16 +241,11 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic),
unassignedPartitions.get(unassignedTopic), newAssignment));
// Step 3: We combine current assignment and new assignment.
- Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
- // As per the KIP, we should revoke the assignments from current
assignment for partitions that were assigned by step 1
- // in the new assignment and have members in current assignment by
step 2. But we haven't implemented it to avoid the
- // complexity in both the implementation and the run time complexity.
This step was mentioned in the KIP to reduce
- // the burden of certain members of the share groups. This can be
achieved with the help of limiting the max
- // no. of partitions assignment for every member(KAFKA-18788). Hence,
the potential problem of burdening
- // the share consumers will be addressed in a future PR.
+ Map<String, Set<TopicIdPartition>> finalAssignment =
newHashMap(numGroupMembers);
newAssignment.forEach((targetPartition, members) ->
members.forEach(member ->
finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition)));
+
// When combining current assignment, we need to only consider the
member topic subscription in current assignment
// which is being subscribed in the new assignment as well.
currentAssignment.forEach((topicIdPartition, members) ->
members.forEach(member -> {
@@ -234,67 +257,94 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
return groupAssignment(finalAssignment, groupSpec.memberIds());
}
- private GroupAssignment groupAssignment(
- Map<String, Set<TopicIdPartition>> assignmentByMember,
- Collection<String> allGroupMembers
- ) {
- Map<String, MemberAssignment> members = new HashMap<>();
- for (Map.Entry<String, Set<TopicIdPartition>> entry :
assignmentByMember.entrySet()) {
- Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
- entry.getValue().forEach(targetPartition ->
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new
HashSet<>()).add(targetPartition.partitionId()));
- members.put(entry.getKey(), new
MemberAssignmentImpl(targetPartitions));
- }
- allGroupMembers.forEach(member -> {
- if (!members.containsKey(member))
- members.put(member, new MemberAssignmentImpl(new HashMap<>()));
- });
-
- return new GroupAssignment(members);
- }
-
/**
* This function updates assignment by hashing the member IDs of the
members and maps the partitions assigned to the
- * members based on the hash. This gives approximately even balance.
- * @param unassignedPartitions - the subscribed topic partitions which
needs assignment.
- * @param memberIds - the member ids to which the topic partitions need to
be assigned.
- * @param assignment - the existing assignment by topic partition. We need
to pass it as a parameter because this
- * function would be called multiple times for
heterogeneous assignment.
+ * members based on the hash, one partition per member. This gives
approximately even balance.
+ * @param memberIds The member ids to which the topic partitions
need to be assigned.
+ * @param partitionsToAssign The subscribed topic partitions which needs
assignment.
+ * @param assignment The existing assignment by topic partition.
We need to pass it as a parameter because this
+ * method can be called multiple times for
heterogeneous assignment.
*/
// Visible for testing
void memberHashAssignment(
- List<TopicIdPartition> unassignedPartitions,
Collection<String> memberIds,
+ List<TopicIdPartition> partitionsToAssign,
Map<TopicIdPartition, List<String>> assignment
) {
- if (!unassignedPartitions.isEmpty())
+ if (!partitionsToAssign.isEmpty()) {
for (String memberId : memberIds) {
- int topicPartitionIndex = Math.abs(memberId.hashCode() %
unassignedPartitions.size());
- TopicIdPartition topicPartition =
unassignedPartitions.get(topicPartitionIndex);
+ int topicPartitionIndex = Math.abs(memberId.hashCode() %
partitionsToAssign.size());
+ TopicIdPartition topicPartition =
partitionsToAssign.get(topicPartitionIndex);
assignment.computeIfAbsent(topicPartition, k -> new
ArrayList<>()).add(memberId);
}
+ }
}
/**
- * This functions assigns topic partitions to members by round-robin
approach and updates the existing assignment.
- * @param memberIds - the member ids to which the topic partitions need to
be assigned, should be non-empty.
- * @param unassignedPartitions - the subscribed topic partitions which
needs assignment.
- * @param assignment - the existing assignment by topic partition.
+ * This functions assigns topic partitions to members by a round-robin
approach and updates the existing assignment.
+ * @param memberIds The member ids to which the topic
partitions need to be assigned, should be non-empty.
+ * @param partitionsToAssign The subscribed topic partitions which
needs assignment.
+ * @param assignment The existing assignment by topic
partition. We need to pass it as a parameter because this
+ * method can be called multiple times for
heterogeneous assignment.
*/
// Visible for testing
void roundRobinAssignment(
Collection<String> memberIds,
- List<TopicIdPartition> unassignedPartitions,
+ List<TopicIdPartition> partitionsToAssign,
Map<TopicIdPartition, List<String>> assignment
) {
// We iterate through the target partitions and assign a memberId to
them. In case we run out of members (members < targetPartitions),
// we again start from the starting index of memberIds.
Iterator<String> memberIdIterator = memberIds.iterator();
- for (TopicIdPartition targetPartition : unassignedPartitions) {
+ for (TopicIdPartition topicPartition : partitionsToAssign) {
if (!memberIdIterator.hasNext()) {
memberIdIterator = memberIds.iterator();
}
String memberId = memberIdIterator.next();
- assignment.computeIfAbsent(targetPartition, k -> new
ArrayList<>()).add(memberId);
+ assignment.computeIfAbsent(topicPartition, k -> new
ArrayList<>()).add(memberId);
+ }
+ }
+
+ /**
+ * This functions assigns topic partitions to members by a round-robin
approach and updates the existing assignment.
+ * @param memberIds The member ids to which the topic
partitions need to be assigned, should be non-empty.
+ * @param partitionsToAssign The subscribed topic partitions which
need assignment.
+ * @param assignment The existing assignment by topic
partition. We need to pass it as a parameter because this
+ * method can be called multiple times for
heterogeneous assignment.
+ * @param desiredAssignmentCount The number of partitions which can be
assigned to each member to give even balance.
+ * Note that this number can be exceeded by
one to allow for situations
+ * in which we have hashing collisions.
+ */
+ void roundRobinAssignmentWithCount(
+ Collection<String> memberIds,
+ List<TopicIdPartition> partitionsToAssign,
+ Map<String, Set<TopicIdPartition>> assignment,
+ int desiredAssignmentCount
+ ) {
+ Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
+
+ // We iterate through the target partitions which are not in the
assignment and assign a memberId to them.
+ // In case we run out of members (memberIds < partitionsToAssign), we
again start from the starting index of memberIds.
+ Iterator<String> memberIdIterator = memberIdsCopy.iterator();
+ ListIterator<TopicIdPartition> partitionListIterator =
partitionsToAssign.listIterator();
+ while (partitionListIterator.hasNext()) {
+ TopicIdPartition partition = partitionListIterator.next();
+ if (!memberIdIterator.hasNext()) {
+ memberIdIterator = memberIdsCopy.iterator();
+ if (memberIdsCopy.isEmpty()) {
+ // This should never happen, but guarding against an
infinite loop
+ throw new PartitionAssignorException("Inconsistent number
of member IDs");
+ }
+ }
+ String memberId = memberIdIterator.next();
+ Set<TopicIdPartition> memberPartitions =
assignment.computeIfAbsent(memberId, k -> new HashSet<>());
+ // We are prepared to add one more partition, even if the desired
assignment count is already reached.
+ if (memberPartitions.size() <= desiredAssignmentCount) {
+ memberPartitions.add(partition);
+ } else {
+ memberIdIterator.remove();
+ partitionListIterator.previous();
+ }
}
}
@@ -318,4 +368,27 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
});
return targetPartitions;
}
+
+ private GroupAssignment groupAssignment(
+ Map<String, Set<TopicIdPartition>> assignmentByMember,
+ Collection<String> allGroupMembers
+ ) {
+ Map<String, MemberAssignment> members = new HashMap<>();
+ for (Map.Entry<String, Set<TopicIdPartition>> entry :
assignmentByMember.entrySet()) {
+ Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
+ entry.getValue().forEach(targetPartition ->
+ targetPartitions.computeIfAbsent(targetPartition.topicId(), k
-> new HashSet<>()).add(targetPartition.partitionId()));
+ members.put(entry.getKey(), new
MemberAssignmentImpl(targetPartitions));
+ }
+ allGroupMembers.forEach(member -> {
+ if (!members.containsKey(member))
+ members.put(member, new MemberAssignmentImpl(new HashMap<>()));
+ });
+
+ return new GroupAssignment(members);
+ }
+
+ private static <K, V> HashMap<K, V> newHashMap(int numMappings) {
+ return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index a0b86d6fa13..8be6abbdd43 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -391,7 +391,7 @@ public class SimpleAssignorTest {
List<TopicIdPartition> partitions = List.of(partition1, partition2,
partition3);
Map<TopicIdPartition, List<String>> computedAssignment = new
HashMap<>();
- assignor.memberHashAssignment(partitions, members, computedAssignment);
+ assignor.memberHashAssignment(members, partitions, computedAssignment);
Map<TopicIdPartition, List<String>> expectedAssignment = new
HashMap<>();
expectedAssignment.put(partition1, List.of(member3));
@@ -425,6 +425,51 @@ public class SimpleAssignorTest {
assertAssignment(expectedAssignment, assignment);
}
+ @Test
+ public void testRoundRobinAssignmentWithCount() {
+ String member1 = "member1";
+ String member2 = "member2";
+ List<String> members = List.of(member1, member2);
+ TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+ TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+ TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+ TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
+ List<TopicIdPartition> unassignedPartitions = List.of(partition2,
partition3, partition4);
+
+ Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
+ assignment.put(member1, new HashSet<>(Set.of(partition1)));
+ assignment.put(member2, new HashSet<>(Set.of(partition1)));
+
+ assignor.roundRobinAssignmentWithCount(members, unassignedPartitions,
assignment, 2);
+ Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
+ member1, Set.of(partition1, partition2, partition4),
+ member2, Set.of(partition1, partition3)
+ );
+
+ assertFinalAssignment(expectedAssignment, assignment);
+ }
+
+ @Test
+ public void testRoundRobinAssignmentWithCountTooManyPartitions() {
+ String member1 = "member1";
+ String member2 = "member2";
+ List<String> members = List.of(member1, member2);
+ TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+ TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+ TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+ TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
+ TopicIdPartition partition5 = new TopicIdPartition(TOPIC_4_UUID, 1);
+ TopicIdPartition partition6 = new TopicIdPartition(TOPIC_4_UUID, 2);
+ List<TopicIdPartition> unassignedPartitions = List.of(partition2,
partition3, partition4, partition5, partition6);
+
+ Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
+ assignment.put(member1, new HashSet<>(Set.of(partition1)));
+ assignment.put(member2, new HashSet<>(Set.of(partition1)));
+
+ assertThrows(PartitionAssignorException.class,
+ () -> assignor.roundRobinAssignmentWithCount(members,
unassignedPartitions, assignment, 2));
+ }
+
@Test
public void testAssignWithCurrentAssignmentHomogeneous() {
// Current assignment setup - Two members A, B subscribing to T1 and
T2.
@@ -758,6 +803,18 @@ public class SimpleAssignorTest {
});
}
+ private void assertFinalAssignment(
+ Map<String, Set<TopicIdPartition>> expectedAssignment,
+ Map<String, Set<TopicIdPartition>> computedAssignment
+ ) {
+ assertEquals(expectedAssignment.size(), computedAssignment.size());
+ expectedAssignment.forEach((memberId, partitions) -> {
+ Set<TopicIdPartition> computedPartitions =
computedAssignment.getOrDefault(memberId, Set.of());
+ assertEquals(partitions.size(), computedPartitions.size());
+ partitions.forEach(member ->
assertTrue(computedPartitions.contains(member)));
+ });
+ }
+
private void assertEveryPartitionGetsAssignment(
int expectedPartitions,
GroupAssignment computedGroupAssignment
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 6f9f56af077..48756b90a1b 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -531,8 +531,8 @@ public class ConsumerGroupCommand {
return topicPartitions
.stream()
.map(TopicPartition::partition)
- .map(Object::toString)
.sorted()
+ .map(Object::toString)
.collect(Collectors.joining(",", topicName + ":", ""));
}).sorted().collect(Collectors.joining(";"));
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 246a5eecbf1..075d0a5b282 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -431,16 +431,18 @@ public class ShareGroupCommand {
}
if (verbose) {
- String fmt = "\n%" + -groupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-13s
%s";
- System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST",
"CLIENT-ID", "MEMBER-EPOCH", "ASSIGNMENT");
+ String fmt = "\n%" + -groupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-12s
%-13s %s";
+ System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST",
"CLIENT-ID", "#PARTITIONS", "MEMBER-EPOCH", "ASSIGNMENT");
for (ShareMemberDescription member : members) {
- System.out.printf(fmt, groupId,
member.consumerId(), member.host(), member.clientId(), member.memberEpoch(),
getAssignmentString(member.assignment()));
+ System.out.printf(fmt, groupId,
member.consumerId(), member.host(), member.clientId(),
+ member.assignment().topicPartitions().size(),
member.memberEpoch(), getAssignmentString(member.assignment()));
}
} else {
- String fmt = "\n%" + -groupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s";
- System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST",
"CLIENT-ID", "ASSIGNMENT");
+ String fmt = "\n%" + -groupLen + "s %" +
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-12s
%s";
+ System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST",
"CLIENT-ID", "#PARTITIONS", "ASSIGNMENT");
for (ShareMemberDescription member : members) {
- System.out.printf(fmt, groupId,
member.consumerId(), member.host(), member.clientId(),
getAssignmentString(member.assignment()));
+ System.out.printf(fmt, groupId,
member.consumerId(), member.host(), member.clientId(),
+ member.assignment().topicPartitions().size(),
getAssignmentString(member.assignment()));
}
}
System.out.println();
@@ -461,8 +463,8 @@ public class ShareGroupCommand {
return topicPartitions
.stream()
.map(TopicPartition::partition)
- .map(Object::toString)
.sorted()
+ .map(Object::toString)
.collect(Collectors.joining(",", topicName + ":", ""));
}).sorted().collect(Collectors.joining(";"));
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 46103e10f43..c3690d953ab 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -475,10 +475,10 @@ public class ShareGroupCommandTest {
List<String> expectedValues1;
if (describeType.contains("--verbose")) {
- expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "0", "topic1:0,1;topic2:0");
+ expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "3", "0", "topic1:0,1;topic2:0");
} else {
- expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "topic1:0,1;topic2:0");
+ expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "3", "topic1:0,1;topic2:0");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1);
@@ -531,12 +531,12 @@ public class ShareGroupCommandTest {
List<String> expectedValues1;
List<String> expectedValues2;
if (describeType.contains("--verbose")) {
- expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "0", "topic1:0,1;topic2:0");
- expectedValues2 = List.of(secondGroup, "memid1",
"host1", "clId1", "0", "topic1:0");
+ expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "3", "0", "topic1:0,1;topic2:0");
+ expectedValues2 = List.of(secondGroup, "memid1",
"host1", "clId1", "1", "0", "topic1:0");
} else {
- expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "topic1:0,1;topic2:0");
- expectedValues2 = List.of(secondGroup, "memid1",
"host1", "clId1", "topic1:0");
+ expectedValues1 = List.of(firstGroup, "memid1",
"host1", "clId1", "3", "topic1:0,1;topic2:0");
+ expectedValues2 = List.of(secondGroup, "memid1",
"host1", "clId1", "1", "topic1:0");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
checkArgsHeaderOutput(cgcArgs, lines[3]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
@@ -899,8 +899,8 @@ public class ShareGroupCommandTest {
private boolean checkMembersArgsHeaderOutput(String output, boolean
verbose) {
List<String> expectedKeys = verbose ?
- List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID",
"MEMBER-EPOCH", "ASSIGNMENT") :
- List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
+ List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID",
"#PARTITIONS", "MEMBER-EPOCH", "ASSIGNMENT") :
+ List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID",
"#PARTITIONS", "ASSIGNMENT");
return
Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
}