This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c2a0965b5e KAFKA-20066: Implement KIP-1251: Assignment epochs for
consumer groups [1/N] (#21557)
9c2a0965b5e is described below
commit 9c2a0965b5e38f4ee7eb5c4d4906d5fe0bd92356
Author: Lucy Liu <[email protected]>
AuthorDate: Fri Feb 27 11:02:08 2026 -0600
KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups
[1/N] (#21557)
# Summary
This PR moves `assignedPartitions` out of `ModernConsumerMember`
interface, add it as independent properties for `ShareGroupMember` and
`ConsumerGroupMember`.
## Reason for the change
In an upcoming PR, the structure of
`ConsumerGroupMember#assignedPartitions` and
`ConsumerGroupMember#partitionsPendingRevocation` will be changed to
include epoch information as
```
Map<Uuid, Map<Integer, Integer>>
```
This differs from the `ShareGroupMember#assignedPartitions` structure,
which remains `Map<Uuid, Set<Integer>>`. Therefore, it is no longer
appropriate to have this as a shared field in the base class.
Reviewers: Sean Quah <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 5 +--
.../group/modern/ModernGroupMember.java | 29 +------------
.../group/modern/consumer/ConsumerGroupMember.java | 26 +++++++++++-
.../group/modern/share/ShareGroupMember.java | 26 +++++++++++-
.../kafka/jmh/assignor/AssignorBenchmarkUtils.java | 48 ++++++++++++++++++----
.../jmh/assignor/ServerSideAssignorBenchmark.java | 2 +-
.../jmh/assignor/ShareGroupAssignorBenchmark.java | 2 +-
.../assignor/TargetAssignmentBuilderBenchmark.java | 2 +-
8 files changed, 94 insertions(+), 46 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e21304178c3..4494a666aef 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -239,7 +239,6 @@ import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
-import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
@@ -2436,7 +2435,7 @@ public class GroupMetadataManager {
// to detect a full request as those must be set in a full request.
// 2. The member's assignment has been updated.
boolean isFullRequest = rebalanceTimeoutMs != -1 &&
(subscribedTopicNames != null || subscribedTopicRegex != null) &&
ownedTopicPartitions != null;
- if (memberEpoch == 0 || isFullRequest ||
hasAssignedPartitionsChanged(member, updatedMember)) {
+ if (memberEpoch == 0 || isFullRequest ||
ConsumerGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(ConsumerGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
}
@@ -2808,7 +2807,7 @@ public class GroupMetadataManager {
// (subscribedTopicNames) to detect a full request as those must be
set in a full request.
// 2. The member's assignment has been updated.
boolean isFullRequest = subscribedTopicNames != null;
- if (memberEpoch == 0 || isFullRequest ||
hasAssignedPartitionsChanged(member, updatedMember)) {
+ if (memberEpoch == 0 || isFullRequest ||
ShareGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(ShareGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
}
return new CoordinatorResult<>(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
index 1b844d24ccd..12e652c8136 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.coordinator.group.modern;
-import org.apache.kafka.common.Uuid;
-
-import java.util.Map;
import java.util.Set;
/**
@@ -71,11 +68,6 @@ public abstract class ModernGroupMember {
*/
protected Set<String> subscribedTopicNames;
- /**
- * The partitions assigned to this member.
- */
- protected Map<Uuid, Set<Integer>> assignedPartitions;
-
protected ModernGroupMember(
String memberId,
int memberEpoch,
@@ -85,8 +77,7 @@ public abstract class ModernGroupMember {
String clientId,
String clientHost,
Set<String> subscribedTopicNames,
- MemberState state,
- Map<Uuid, Set<Integer>> assignedPartitions
+ MemberState state
) {
this.memberId = memberId;
this.memberEpoch = memberEpoch;
@@ -97,7 +88,6 @@ public abstract class ModernGroupMember {
this.clientId = clientId;
this.clientHost = clientHost;
this.subscribedTopicNames = subscribedTopicNames;
- this.assignedPartitions = assignedPartitions;
}
/**
@@ -169,21 +159,4 @@ public abstract class ModernGroupMember {
public boolean isReconciledTo(int targetAssignmentEpoch) {
return state == MemberState.STABLE && memberEpoch ==
targetAssignmentEpoch;
}
-
- /**
- * @return The set of assigned partitions.
- */
- public Map<Uuid, Set<Integer>> assignedPartitions() {
- return assignedPartitions;
- }
-
- /**
- * @return True of the two provided members have different assigned
partitions.
- */
- public static boolean hasAssignedPartitionsChanged(
- ModernGroupMember member1,
- ModernGroupMember member2
- ) {
- return
!member1.assignedPartitions().equals(member2.assignedPartitions());
- }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 9b5f0c1f6c8..b2d9d0ea492 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -263,6 +263,11 @@ public class ConsumerGroupMember extends ModernGroupMember
{
*/
private final String serverAssignorName;
+ /**
+ * The partitions assigned to this member.
+ */
+ private final Map<Uuid, Set<Integer>> assignedPartitions;
+
/**
* The partitions being revoked by this member.
*/
@@ -299,12 +304,12 @@ public class ConsumerGroupMember extends
ModernGroupMember {
clientId,
clientHost,
subscribedTopicNames,
- state,
- assignedPartitions
+ state
);
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.subscribedTopicRegex = subscribedTopicRegex;
this.serverAssignorName = serverAssignorName;
+ this.assignedPartitions = assignedPartitions;
this.partitionsPendingRevocation = partitionsPendingRevocation;
this.classicMemberMetadata = classicMemberMetadata;
}
@@ -330,6 +335,13 @@ public class ConsumerGroupMember extends ModernGroupMember
{
return Optional.ofNullable(serverAssignorName);
}
+ /**
+ * @return The set of assigned partitions.
+ */
+ public Map<Uuid, Set<Integer>> assignedPartitions() {
+ return assignedPartitions;
+ }
+
/**
* @return The set of partitions awaiting revocation from the member.
*/
@@ -337,6 +349,16 @@ public class ConsumerGroupMember extends ModernGroupMember
{
return partitionsPendingRevocation;
}
+ /**
+ * @return True if the two provided members have different assigned
partitions.
+ */
+ public static boolean hasAssignedPartitionsChanged(
+ ConsumerGroupMember member1,
+ ConsumerGroupMember member2
+ ) {
+ return
!member1.assignedPartitions().equals(member2.assignedPartitions());
+ }
+
/**
* @return The supported classic protocols converted to
JoinGroupRequestProtocolCollection.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
index 2bb75578c7b..bc40d5025fc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
@@ -170,6 +170,11 @@ public class ShareGroupMember extends ModernGroupMember {
}
}
+ /**
+ * The partitions assigned to this member.
+ */
+ private final Map<Uuid, Set<Integer>> assignedPartitions;
+
private ShareGroupMember(
String memberId,
int memberEpoch,
@@ -190,9 +195,26 @@ public class ShareGroupMember extends ModernGroupMember {
clientId,
clientHost,
subscribedTopicNames,
- state,
- assignedPartitions
+ state
);
+ this.assignedPartitions = assignedPartitions;
+ }
+
+ /**
+ * @return The partitions assigned to this member.
+ */
+ public Map<Uuid, Set<Integer>> assignedPartitions() {
+ return assignedPartitions;
+ }
+
+ /**
+ * @return True if the two provided members have different assigned
partitions.
+ */
+ public static boolean hasAssignedPartitionsChanged(
+ ShareGroupMember member1,
+ ShareGroupMember member2
+ ) {
+ return
!member1.assignedPartitions().equals(member2.assignedPartitions());
}
/**
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index 555c92457f8..83e9ba23da2 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -28,7 +28,6 @@ import
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import
org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
-import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
@@ -115,24 +114,23 @@ public class AssignorBenchmarkUtils {
}
/**
- * Creates a GroupSpec from the given ModernGroupMembers.
+ * Creates a GroupSpec from the given ConsumerGroupMembers.
*
- * @param members The ModernGroupMembers.
+ * @param members The ConsumerGroupMembers.
* @param subscriptionType The group's subscription type.
* @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
- public static GroupSpec createGroupSpec(
- Map<String, ? extends ModernGroupMember> members,
+ public static GroupSpec createConsumerGroupSpec(
+ Map<String, ConsumerGroupMember> members,
SubscriptionType subscriptionType,
TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new
HashMap<>();
- // Prepare the member spec for all members.
- for (Map.Entry<String, ? extends ModernGroupMember> memberEntry :
members.entrySet()) {
+ for (Map.Entry<String, ConsumerGroupMember> memberEntry :
members.entrySet()) {
String memberId = memberEntry.getKey();
- ModernGroupMember member = memberEntry.getValue();
+ ConsumerGroupMember member = memberEntry.getValue();
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
@@ -149,6 +147,40 @@ public class AssignorBenchmarkUtils {
);
}
+ /**
+ * Creates a GroupSpec from the given ShareGroupMembers.
+ *
+ * @param members The ShareGroupMembers.
+ * @param subscriptionType The group's subscription type.
+ * @param topicResolver The TopicResolver to use.
+ * @return The new GroupSpec.
+ */
+ public static GroupSpec createShareGroupSpec(
+ Map<String, ShareGroupMember> members,
+ SubscriptionType subscriptionType,
+ TopicIds.TopicResolver topicResolver
+ ) {
+ Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new
HashMap<>();
+
+ for (Map.Entry<String, ShareGroupMember> memberEntry :
members.entrySet()) {
+ String memberId = memberEntry.getKey();
+ ShareGroupMember member = memberEntry.getValue();
+
+ memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
+ Optional.ofNullable(member.rackId()),
+ Optional.empty(),
+ new TopicIds(member.subscribedTopicNames(), topicResolver),
+ new Assignment(member.assignedPartitions())
+ ));
+ }
+
+ return new GroupSpecImpl(
+ memberSpecs,
+ subscriptionType,
+ Map.of()
+ );
+ }
+
/**
* Creates a ConsumerGroupMembers map where all members have the same
topic subscriptions.
*
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index b96d718c654..38e19447444 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -135,7 +135,7 @@ public class ServerSideAssignorBenchmark {
setupTopics();
Map<String, ConsumerGroupMember> members = createMembers();
- this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members,
subscriptionType, topicResolver);
+ this.groupSpec =
AssignorBenchmarkUtils.createConsumerGroupSpec(members, subscriptionType,
topicResolver);
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
index 84a59560e0a..95148a2c140 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
@@ -126,7 +126,7 @@ public class ShareGroupAssignorBenchmark {
setupTopics();
Map<String, ShareGroupMember> members = createMembers();
- this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members,
subscriptionType, topicResolver);
+ this.groupSpec = AssignorBenchmarkUtils.createShareGroupSpec(members,
subscriptionType, topicResolver);
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index c669737de36..8b73cf11555 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -135,7 +135,7 @@ public class TargetAssignmentBuilderBenchmark {
private Map<String, Assignment>
generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(
Map<String, ConsumerGroupMember> members
) {
- this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
+ this.groupSpec = AssignorBenchmarkUtils.createConsumerGroupSpec(
members,
subscriptionType,
topicResolver