This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71a7d859553 KAFKA-19431: Ensure consumer and share assignment
consistency with subscriptions (#20055)
71a7d859553 is described below
commit 71a7d85955342528dcb10bcded42048c37819687
Author: Sean Quah <[email protected]>
AuthorDate: Mon Oct 6 13:57:44 2025 +0100
KAFKA-19431: Ensure consumer and share assignment consistency with
subscriptions (#20055)
Filter out unsubscribed topics during reconciliation.
This eliminates the window where a consumer group assignment could
contain unsubscribed topics when a member unsubscribes from a topic
while it has unrevoked partitions.
We also apply filtering in a few other cases that would arise when
client-side assignors are implemented, since new assignments would no
longer be available immediately. This is important for mixed groups,
since clients on the classic protocol will rejoin if they receive a
topic in their assignment that is no longer in their subscription.
Regex subscriptions have a window where the regex is not resolved and we
cannot know which topics are part of the subscription. We opt to be
conservative and treat unresolved regexes as matching no topics.
The same change is applied to share groups, since the reconciliation
process is similar.
To gauge the performance impact of the change, we add a jmh benchmark.
Reviewers: Lucas Brutschy <[email protected]>, Lianet Magran
<[email protected]>, Sushant Mahajan <[email protected]>, Dongnuo Lyu
<[email protected]>, David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 112 +++--
.../modern/consumer/CurrentAssignmentBuilder.java | 171 ++++++-
.../modern/share/ShareGroupAssignmentBuilder.java | 73 ++-
.../group/modern/share/ShareGroupMember.java | 1 +
.../group/GroupMetadataManagerTest.java | 314 +++++++++++--
.../consumer/CurrentAssignmentBuilderTest.java | 514 ++++++++++++++++++++-
.../share/ShareGroupAssignmentBuilderTest.java | 78 ++++
.../CurrentAssignmentBuilderBenchmark.java | 171 +++++++
8 files changed, 1363 insertions(+), 71 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index cea68e09ffb..be9c4f9abc3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2249,18 +2249,13 @@ public class GroupMetadataManager {
.setClassicMemberMetadata(null)
.build();
- // If the group is newly created, we must ensure that it moves away
from
- // epoch 0 and that it is fully initialized.
- boolean bumpGroupEpoch = group.groupEpoch() == 0;
-
- bumpGroupEpoch |= hasMemberSubscriptionChanged(
+ boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
-
- bumpGroupEpoch |= maybeUpdateRegularExpressions(
+ UpdateRegularExpressionsResult updateRegularExpressionsResult =
maybeUpdateRegularExpressions(
context,
group,
member,
@@ -2268,9 +2263,24 @@ public class GroupMetadataManager {
records
);
+ // The subscription has changed when either the subscribed topic names
or subscribed topic
+ // regex has changed.
+ boolean hasSubscriptionChanged = subscribedTopicNamesChanged ||
updateRegularExpressionsResult.regexUpdated();
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
+ boolean bumpGroupEpoch =
+ // If the group is newly created, we must ensure that it moves
away from
+ // epoch 0 and that it is fully initialized.
+ groupEpoch == 0 ||
+ // Bumping the group epoch signals that the target assignment
should be updated. We bump
+ // the group epoch when the member has changed its subscribed
topic names or the member
+ // has changed its subscribed topic regex to a regex that is
already resolved. We avoid
+ // bumping the group epoch when the new subscribed topic regex has
not been resolved
+ // yet, since we will have to update the target assignment again
later.
+ subscribedTopicNamesChanged ||
+ updateRegularExpressionsResult ==
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
@@ -2315,6 +2325,9 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
+ group.resolvedRegularExpressions(),
+ // Force consistency with the subscription when the subscription
has changed.
+ hasSubscriptionChanged,
ownedTopicPartitions,
records
);
@@ -2468,6 +2481,8 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
group.assignmentEpoch(),
group.targetAssignment(updatedMember.memberId(),
updatedMember.instanceId()),
+ group.resolvedRegularExpressions(),
+ bumpGroupEpoch,
toTopicPartitions(subscription.ownedPartitions(),
metadataImage),
records
);
@@ -2511,6 +2526,9 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
+ group.resolvedRegularExpressions(),
+ // Force consistency with the subscription when the
subscription has changed.
+ bumpGroupEpoch,
toTopicPartitions(subscription.ownedPartitions(),
metadataImage),
records
);
@@ -2669,6 +2687,8 @@ public class GroupMetadataManager {
updatedMember,
targetAssignmentEpoch,
targetAssignment,
+ // Force consistency with the subscription when the subscription
has changed.
+ bumpGroupEpoch,
records
);
@@ -3108,6 +3128,16 @@ public class GroupMetadataManager {
return value != null && !value.isEmpty();
}
+ private enum UpdateRegularExpressionsResult {
+ NO_CHANGE,
+ REGEX_UPDATED,
+ REGEX_UPDATED_AND_RESOLVED;
+
+ public boolean regexUpdated() {
+ return this == REGEX_UPDATED || this == REGEX_UPDATED_AND_RESOLVED;
+ }
+ }
+
/**
* Check whether the member has updated its subscribed topic regular
expression and
* may trigger the resolution/the refresh of all the regular expressions
in the
@@ -3119,9 +3149,9 @@ public class GroupMetadataManager {
* @param member The old member.
* @param updatedMember The new member.
* @param records The records accumulator.
- * @return Whether a rebalance must be triggered.
+ * @return The result of the update.
*/
- private boolean maybeUpdateRegularExpressions(
+ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
AuthorizableRequestContext context,
ConsumerGroup group,
ConsumerGroupMember member,
@@ -3134,14 +3164,17 @@ public class GroupMetadataManager {
String oldSubscribedTopicRegex = member.subscribedTopicRegex();
String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
- boolean bumpGroupEpoch = false;
boolean requireRefresh = false;
+ UpdateRegularExpressionsResult updateRegularExpressionsResult =
UpdateRegularExpressionsResult.NO_CHANGE;
// Check whether the member has changed its subscribed regex.
- if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex))
{
+ boolean subscribedTopicRegexChanged =
!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex);
+ if (subscribedTopicRegexChanged) {
log.debug("[GroupId {}] Member {} updated its subscribed regex to:
{}.",
groupId, memberId, newSubscribedTopicRegex);
+ updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED;
+
if (isNotEmpty(oldSubscribedTopicRegex) &&
group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
// If the member was the last one subscribed to the regex, we
delete the
// resolved regular expression.
@@ -3160,7 +3193,9 @@ public class GroupMetadataManager {
} else {
// If the new regex is already resolved, we trigger a
rebalance
// by bumping the group epoch.
- bumpGroupEpoch =
group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
+ if
(group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) {
+ updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
+ }
}
}
}
@@ -3176,20 +3211,20 @@ public class GroupMetadataManager {
// 0. The group is subscribed to regular expressions. We also take the
one
// that the current may have just introduced.
if (!requireRefresh && group.subscribedRegularExpressions().isEmpty())
{
- return bumpGroupEpoch;
+ return updateRegularExpressionsResult;
}
// 1. There is no ongoing refresh for the group.
String key = group.groupId() + "-regex";
if (executor.isScheduled(key)) {
- return bumpGroupEpoch;
+ return updateRegularExpressionsResult;
}
// 2. The last refresh is older than 10s. If the group does not have
any regular
// expressions but the current member just brought a new one, we
should continue.
long lastRefreshTimeMs =
group.lastResolvedRegularExpressionRefreshTimeMs();
if (currentTimeMs <= lastRefreshTimeMs +
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
- return bumpGroupEpoch;
+ return updateRegularExpressionsResult;
}
// 3.1 The group has unresolved regular expressions.
@@ -3218,7 +3253,7 @@ public class GroupMetadataManager {
);
}
- return bumpGroupEpoch;
+ return updateRegularExpressionsResult;
}
/**
@@ -3492,16 +3527,18 @@ public class GroupMetadataManager {
/**
* Reconciles the current assignment of the member towards the target
assignment if needed.
*
- * @param groupId The group id.
- * @param member The member to reconcile.
- * @param currentPartitionEpoch The function returning the current epoch of
- * a given partition.
- * @param targetAssignmentEpoch The target assignment epoch.
- * @param targetAssignment The target assignment.
- * @param ownedTopicPartitions The list of partitions owned by the
member. This
- * is reported in the ConsumerGroupHeartbeat
API and
- * it could be null if not provided.
- * @param records The list to accumulate any new records.
+ * @param groupId The group id.
+ * @param member The member to reconcile.
+ * @param currentPartitionEpoch The function returning the current
epoch of
+ * a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @param resolvedRegularExpressions The resolved regular expressions.
+ * @param hasSubscriptionChanged Whether the member has changed its
subscription on the current heartbeat.
+ * @param ownedTopicPartitions The list of partitions owned by the
member. This
+ * is reported in the
ConsumerGroupHeartbeat API and
+ * it could be null if not provided.
+ * @param records The list to accumulate any new
records.
* @return The received member if no changes have been made; or a new
* member containing the new assignment.
*/
@@ -3511,15 +3548,20 @@ public class GroupMetadataManager {
BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
int targetAssignmentEpoch,
Assignment targetAssignment,
+ Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
+ boolean hasSubscriptionChanged,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
List<CoordinatorRecord> records
) {
- if (member.isReconciledTo(targetAssignmentEpoch)) {
+ if (!hasSubscriptionChanged &&
member.isReconciledTo(targetAssignmentEpoch)) {
return member;
}
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+ .withHasSubscriptionChanged(hasSubscriptionChanged)
+ .withResolvedRegularExpressions(resolvedRegularExpressions)
.withCurrentPartitionEpoch(currentPartitionEpoch)
.withOwnedTopicPartitions(ownedTopicPartitions)
.build();
@@ -3556,11 +3598,12 @@ public class GroupMetadataManager {
/**
* Reconciles the current assignment of the member towards the target
assignment if needed.
*
- * @param groupId The group id.
- * @param member The member to reconcile.
- * @param targetAssignmentEpoch The target assignment epoch.
- * @param targetAssignment The target assignment.
- * @param records The list to accumulate any new records.
+ * @param groupId The group id.
+ * @param member The member to reconcile.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @param hasSubscriptionChanged Whether the member has changed its
subscription on the current heartbeat.
+ * @param records The list to accumulate any new records.
* @return The received member if no changes have been made; or a new
* member containing the new assignment.
*/
@@ -3569,14 +3612,17 @@ public class GroupMetadataManager {
ShareGroupMember member,
int targetAssignmentEpoch,
Assignment targetAssignment,
+ boolean hasSubscriptionChanged,
List<CoordinatorRecord> records
) {
- if (member.isReconciledTo(targetAssignmentEpoch)) {
+ if (!hasSubscriptionChanged &&
member.isReconciledTo(targetAssignmentEpoch)) {
return member;
}
ShareGroupMember updatedMember = new
ShareGroupAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+ .withHasSubscriptionChanged(hasSubscriptionChanged)
.build();
if (!updatedMember.equals(member)) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
index 74a5bd7a2e3..63d61a3b923 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
@@ -19,8 +19,11 @@ package org.apache.kafka.coordinator.group.modern.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
+import org.apache.kafka.coordinator.group.modern.TopicIds;
+import org.apache.kafka.coordinator.group.modern.UnionSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -41,6 +44,11 @@ public class CurrentAssignmentBuilder {
*/
private final ConsumerGroupMember member;
+ /**
+ * The metadata image.
+ */
+ private CoordinatorMetadataImage metadataImage =
CoordinatorMetadataImage.EMPTY;
+
/**
* The target assignment epoch.
*/
@@ -51,6 +59,16 @@ public class CurrentAssignmentBuilder {
*/
private Assignment targetAssignment;
+ /**
+ * Whether the member has changed its subscription on the current
heartbeat.
+ */
+ private boolean hasSubscriptionChanged;
+
+ /**
+ * The resolved regular expressions.
+ */
+ private Map<String, ResolvedRegularExpression> resolvedRegularExpressions
= Map.of();
+
/**
* A function which returns the current epoch of a topic-partition or -1
if the
* topic-partition is not assigned. The current epoch is the epoch of the
current owner.
@@ -73,6 +91,19 @@ public class CurrentAssignmentBuilder {
this.member = Objects.requireNonNull(member);
}
+ /**
+ * Sets the metadata image.
+ *
+ * @param metadataImage The metadata image.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withMetadataImage(
+ CoordinatorMetadataImage metadataImage
+ ) {
+ this.metadataImage = metadataImage;
+ return this;
+ }
+
/**
* Sets the target assignment epoch and the target assignment that the
* consumer group member must be reconciled to.
@@ -90,6 +121,32 @@ public class CurrentAssignmentBuilder {
return this;
}
+ /**
+ * Sets whether the member has changed its subscription on the current
heartbeat.
+ *
+ * @param hasSubscriptionChanged If true, always removes unsubscribed
topics from the current assignment.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withHasSubscriptionChanged(
+ boolean hasSubscriptionChanged
+ ) {
+ this.hasSubscriptionChanged = hasSubscriptionChanged;
+ return this;
+ }
+
+ /**
+ * Sets the resolved regular expressions.
+ *
+ * @param resolvedRegularExpressions The resolved regular expressions.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withResolvedRegularExpressions(
+ Map<String, ResolvedRegularExpression> resolvedRegularExpressions
+ ) {
+ this.resolvedRegularExpressions = resolvedRegularExpressions;
+ return this;
+ }
+
/**
* Sets a BiFunction which allows to retrieve the current epoch of a
* partition. This is used by the state machine to determine if a
@@ -132,12 +189,15 @@ public class CurrentAssignmentBuilder {
case STABLE:
// When the member is in the STABLE state, we verify if a newer
// epoch (or target assignment) is available. If it is, we can
- // reconcile the member towards it. Otherwise, we return.
+ // reconcile the member towards it. Otherwise, we ensure the
+ // assignment is consistent with the subscribed topics, if
changed.
if (member.memberEpoch() != targetAssignmentEpoch) {
return computeNextAssignment(
member.memberEpoch(),
member.assignedPartitions()
);
+ } else if (hasSubscriptionChanged) {
+ return
updateCurrentAssignment(member.assignedPartitions());
} else {
return member;
}
@@ -147,18 +207,27 @@ public class CurrentAssignmentBuilder {
// until the member has revoked the necessary partitions. They
are
// considered revoked when they are not anymore reported in the
// owned partitions set in the ConsumerGroupHeartbeat API.
+ // Additional partitions may need revoking when the member's
+ // subscription changes.
// If the member provides its owned partitions. We verify if
it still
// owns any of the revoked partitions. If it does, we cannot
progress.
if
(ownsRevokedPartitions(member.partitionsPendingRevocation())) {
- return member;
+ if (hasSubscriptionChanged) {
+ return
updateCurrentAssignment(member.assignedPartitions());
+ } else {
+ return member;
+ }
}
// When the member has revoked all the pending partitions, it
can
// transition to the next epoch (current + 1) and we can
reconcile
// its state towards the latest target assignment.
return computeNextAssignment(
- member.memberEpoch() + 1,
+ // When we enter UNREVOKED_PARTITIONS due to a
subscription change,
+ // we must not advance the member epoch when the new target
+ // assignment is not available yet.
+ Math.min(member.memberEpoch() + 1, targetAssignmentEpoch),
member.assignedPartitions()
);
@@ -215,6 +284,71 @@ public class CurrentAssignmentBuilder {
return false;
}
+ /**
+ * Updates the current assignment, removing any partitions that are not
part of the subscribed topics.
+ * This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
+ *
+ * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @return A new ConsumerGroupMember.
+ */
+ private ConsumerGroupMember updateCurrentAssignment(
+ Map<Uuid, Set<Integer>> memberAssignedPartitions
+ ) {
+ Set<Uuid> subscribedTopicIds = subscribedTopicIds();
+
+ // Reuse the original map if no topics need to be removed.
+ Map<Uuid, Set<Integer>> newAssignedPartitions;
+ Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ if (subscribedTopicIds.isEmpty() &&
member.partitionsPendingRevocation().isEmpty()) {
+ newAssignedPartitions = Map.of();
+ newPartitionsPendingRevocation = memberAssignedPartitions;
+ } else {
+ newAssignedPartitions = memberAssignedPartitions;
+ newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
+ for (Map.Entry<Uuid, Set<Integer>> entry :
memberAssignedPartitions.entrySet()) {
+ if (!subscribedTopicIds.contains(entry.getKey())) {
+ if (newAssignedPartitions == memberAssignedPartitions) {
+ newAssignedPartitions = new
HashMap<>(memberAssignedPartitions);
+ newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
+ }
+ newAssignedPartitions.remove(entry.getKey());
+ newPartitionsPendingRevocation.merge(
+ entry.getKey(),
+ entry.getValue(),
+ (existing, additional) -> {
+ existing = new HashSet<>(existing);
+ existing.addAll(additional);
+ return existing;
+ }
+ );
+ }
+ }
+ }
+
+ if (newAssignedPartitions == memberAssignedPartitions) {
+ // If no partitions were removed, we can return the member as is.
+ return member;
+ }
+
+ if (!newPartitionsPendingRevocation.isEmpty() &&
ownsRevokedPartitions(newPartitionsPendingRevocation)) {
+ return new ConsumerGroupMember.Builder(member)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setAssignedPartitions(newAssignedPartitions)
+ .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
+ .build();
+ } else {
+ // There were partitions removed, but they were already revoked.
+ // Keep the member in the current state and shrink the assigned
partitions.
+
+ // We do not expect to be in the UNREVOKED_PARTITIONS state here.
The full
+ // reconciliation logic should handle the case where the member
has revoked all its
+ // partitions pending revocation.
+ return new ConsumerGroupMember.Builder(member)
+ .setAssignedPartitions(newAssignedPartitions)
+ .build();
+ }
+ }
+
/**
* Computes the next assignment.
*
@@ -227,6 +361,8 @@ public class CurrentAssignmentBuilder {
int memberEpoch,
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
+ Set<Uuid> subscribedTopicIds = subscribedTopicIds();
+
boolean hasUnreleasedPartitions = false;
Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new
HashMap<>();
@@ -241,6 +377,11 @@ public class CurrentAssignmentBuilder {
Set<Integer> currentAssignedPartitions = memberAssignedPartitions
.getOrDefault(topicId, Set.of());
+ // If the member is no longer subscribed to the topic, treat its
target assignment as empty.
+ if (!subscribedTopicIds.contains(topicId)) {
+ target = Set.of();
+ }
+
// New Assigned Partitions = Previous Assigned Partitions ∩ Target
Set<Integer> assignedPartitions = new
HashSet<>(currentAssignedPartitions);
assignedPartitions.retainAll(target);
@@ -317,4 +458,28 @@ public class CurrentAssignmentBuilder {
.build();
}
}
+
+ /**
+ * Gets the set of topic IDs that the member is subscribed to.
+ *
+ * @return The set of topic IDs that the member is subscribed to.
+ */
+ private Set<Uuid> subscribedTopicIds() {
+ Set<String> subscriptions = member.subscribedTopicNames();
+ String subscribedTopicRegex = member.subscribedTopicRegex();
+ if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) {
+ ResolvedRegularExpression resolvedRegularExpression =
resolvedRegularExpressions.get(subscribedTopicRegex);
+ if (resolvedRegularExpression != null) {
+ if (subscriptions.isEmpty()) {
+ subscriptions = resolvedRegularExpression.topics();
+ } else if (!resolvedRegularExpression.topics().isEmpty()) {
+ subscriptions = new UnionSet<>(subscriptions,
resolvedRegularExpression.topics());
+ }
+ } else {
+ // Treat an unresolved regex as matching no topics, to be
conservative.
+ }
+ }
+
+ return new TopicIds(subscriptions, metadataImage);
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java
index 38bcfae47e1..98b40340b0a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java
@@ -16,10 +16,16 @@
*/
package org.apache.kafka.coordinator.group.modern.share;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
+import org.apache.kafka.coordinator.group.modern.TopicIds;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
* The ShareGroupAssignmentBuilder class encapsulates the reconciliation
engine of the
@@ -32,6 +38,11 @@ public class ShareGroupAssignmentBuilder {
*/
private final ShareGroupMember member;
+ /**
+ * The metadata image.
+ */
+ private CoordinatorMetadataImage metadataImage =
CoordinatorMetadataImage.EMPTY;
+
/**
* The target assignment epoch.
*/
@@ -42,6 +53,11 @@ public class ShareGroupAssignmentBuilder {
*/
private Assignment targetAssignment;
+ /**
+ * Whether the member has changed its subscription on the current
heartbeat.
+ */
+ private boolean hasSubscriptionChanged;
+
/**
* Constructs the ShareGroupAssignmentBuilder based on the current state
of the
* provided share group member.
@@ -52,6 +68,19 @@ public class ShareGroupAssignmentBuilder {
this.member = Objects.requireNonNull(member);
}
+ /**
+ * Sets the metadata image.
+ *
+ * @param metadataImage The metadata image.
+ * @return This object.
+ */
+ public ShareGroupAssignmentBuilder withMetadataImage(
+ CoordinatorMetadataImage metadataImage
+ ) {
+ this.metadataImage = metadataImage;
+ return this;
+ }
+
/**
* Sets the target assignment epoch and the target assignment that the
* share group member must be reconciled to.
@@ -69,6 +98,19 @@ public class ShareGroupAssignmentBuilder {
return this;
}
+ /**
+ * Sets whether the member has changed its subscription on the current
heartbeat.
+ *
+ * @param hasSubscriptionChanged If true, always removes unsubscribed
topics from the current assignment.
+ * @return This object.
+ */
+ public ShareGroupAssignmentBuilder withHasSubscriptionChanged(
+ boolean hasSubscriptionChanged
+ ) {
+ this.hasSubscriptionChanged = hasSubscriptionChanged;
+ return this;
+ }
+
/**
* Builds the next state for the member or keep the current one if it
* is not possible to move forward with the current state.
@@ -83,11 +125,38 @@ public class ShareGroupAssignmentBuilder {
// when the member is updated.
return new ShareGroupMember.Builder(member)
.setState(MemberState.STABLE)
- .setAssignedPartitions(targetAssignment.partitions())
+ // If we have client-side assignors, the latest target
assignment may not
+ // be consistent with the latest subscribed topics, so we must
always
+ // filter the assigned partitions to ensure they are
consistent with the
+ // subscribed topics.
+
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(),
member.subscribedTopicNames()))
.updateMemberEpoch(targetAssignmentEpoch)
.build();
+ } else if (hasSubscriptionChanged) {
+ return new ShareGroupMember.Builder(member)
+
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(),
member.subscribedTopicNames()))
+ .build();
+ } else {
+ return member;
}
+ }
- return member;
+ private Map<Uuid, Set<Integer>> filterAssignedPartitions(
+ Map<Uuid, Set<Integer>> partitions,
+ Set<String> subscribedTopicNames
+ ) {
+ TopicIds subscribedTopicIds = new
TopicIds(member.subscribedTopicNames(), metadataImage);
+
+ // Reuse the original map if no topics need to be removed.
+ Map<Uuid, Set<Integer>> filteredPartitions = partitions;
+ for (Map.Entry<Uuid, Set<Integer>> entry : partitions.entrySet()) {
+ if (!subscribedTopicIds.contains(entry.getKey())) {
+ if (filteredPartitions == partitions) {
+ filteredPartitions = new HashMap<>(partitions);
+ }
+ filteredPartitions.remove(entry.getKey());
+ }
+ }
+ return filteredPartitions;
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
index 57af4c98fd4..2bb75578c7b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
@@ -74,6 +74,7 @@ public class ShareGroupMember extends ModernGroupMember {
this.memberId = Objects.requireNonNull(newMemberId);
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
+ this.state = member.state;
this.rackId = member.rackId;
this.clientId = member.clientId;
this.clientHost = member.clientHost;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 957ae7e8147..6521c48532c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20604,7 +20604,7 @@ public class GroupMetadataManagerTest {
.build();
// Member 1 updates its new regular expression.
- CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
@@ -20620,19 +20620,15 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List.of(
- new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(fooTopicId)
- .setPartitions(List.of(0, 1, 2, 3, 4, 5))
- ))
+ .setTopicPartitions(List.of())
),
- result.response()
+ result1.response()
);
ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
- .setPreviousMemberEpoch(0)
+ .setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
@@ -20644,10 +20640,12 @@ public class GroupMetadataManagerTest {
// The member subscription is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
// The previous regular expression is deleted.
-
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+ // The member assignment is updated.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
);
- assertRecordsEquals(expectedRecords, result.records());
+ assertRecordsEquals(expectedRecords, result1.records());
// Execute pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks
= context.processTasks();
@@ -20675,6 +20673,65 @@ public class GroupMetadataManagerTest {
),
task.result().records()
);
+
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)
+ ))
+ )));
+
+ // Member heartbeats again with the same regex.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5)),
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result2.response()
+ );
+
+ ConsumerGroupMember expectedMember2 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .build();
+
+ expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ );
+
+ assertRecordsEquals(expectedRecords, result2.records());
}
@Test
@@ -21077,10 +21134,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List.of(
- new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(fooTopicId)
- .setPartitions(List.of(3, 4, 5))))),
+ .setTopicPartitions(List.of())),
result1.response()
);
@@ -21098,7 +21152,8 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
),
result1.records()
);
@@ -21164,8 +21219,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo|bar*")
.setServerAssignorName("range")
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .setAssignedPartitions(mkAssignment())
.build();
assertResponseEquals(
@@ -21174,10 +21228,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List.of(
- new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(fooTopicId)
- .setPartitions(List.of(3, 4, 5))))),
+ .setTopicPartitions(List.of())),
result2.response()
);
@@ -21306,10 +21357,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List.of(
- new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(fooTopicId)
- .setPartitions(List.of(3, 4, 5))))),
+ .setTopicPartitions(List.of())),
result1.response()
);
@@ -21327,7 +21375,8 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
),
result1.records()
);
@@ -21440,6 +21489,219 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .buildCoordinatorMetadataImage(12345L);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Static member temporarily leaves the group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setInstanceId(instanceId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ );
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH),
+ result1.response()
+ );
+
+ // Static member joins the group with an updated regular expression.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setInstanceId(instanceId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()));
+
+ // The returned assignment does not contain topics not in the current
regular expression.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of())
+ ),
+ result2.response()
+ );
+
+ ConsumerGroupMember expectedCopiedMember = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .build();
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ // The previous member is deleted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+ // The previous member is replaced by the new one.
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedCopiedMember),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1, 2)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedCopiedMember),
+ // The member subscription is updated.
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
+ // The previous regular expression is deleted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*|bar*"),
+ // The member assignment is updated.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
+ );
+
+ assertRecordsEquals(expectedRecords, result2.records());
+
+ // Execute pending tasks.
+ List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks
= context.processTasks();
+ assertEquals(1, tasks.size());
+
+ MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task =
tasks.get(0);
+ assertEquals(groupId + "-regex", task.key());
+ assertRecordsEquals(
+ List.of(
+ // The resolution of the new regex is persisted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "foo*",
+ new ResolvedRegularExpression(
+ Set.of("foo"),
+ 12345L,
+ context.time.milliseconds()
+ )
+ ),
+ // The group epoch is bumped.
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ )))
+ ),
+ task.result().records()
+ );
+
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ ))
+ )));
+
+ // Member heartbeats again with the same regex.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result3 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setInstanceId(instanceId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))))),
+ result3.response()
+ );
+
+ ConsumerGroupMember expectedMember2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ );
+
+ assertRecordsEquals(expectedRecords, result3.records());
+ }
+
@Test
public void
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
String groupId = "fooup";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
index 3a4931efad9..48441780689 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
@@ -19,13 +19,19 @@ package org.apache.kafka.coordinator.group.modern.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@@ -36,19 +42,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToStable() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
@@ -60,6 +75,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
@@ -70,19 +86,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToStableWithNewPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7))))
@@ -94,6 +119,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7)))
@@ -104,19 +130,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToUnrevokedPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@@ -128,6 +163,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -141,19 +177,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToUnreleasedPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7))))
@@ -165,6 +210,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
@@ -175,19 +221,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void
testStableToUnreleasedPartitionsWithOwnedPartitionsNotHavingRevokedPartitions()
{
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 7))))
@@ -202,6 +257,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5)))
@@ -212,13 +268,21 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnrevokedPartitionsToStable() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -228,6 +292,7 @@ public class CurrentAssignmentBuilderTest {
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6))))
@@ -246,6 +311,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -256,13 +322,21 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testRemainsInUnrevokedPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -272,6 +346,7 @@ public class CurrentAssignmentBuilderTest {
.build();
CurrentAssignmentBuilder currentAssignmentBuilder = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@@ -311,15 +386,27 @@ public class CurrentAssignmentBuilderTest {
);
}
- @Test
- public void testUnrevokedPartitionsToUnrevokedPartitions() {
+ @ParameterizedTest
+ @CsvSource({
+ "10, 12, 11",
+ "10, 10, 10", // The member epoch must not advance past the target
assignment epoch.
+ })
+ public void testUnrevokedPartitionsToUnrevokedPartitions(int memberEpoch,
int targetAssignmentEpoch, int expectedMemberEpoch) {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
- .setMemberEpoch(10)
- .setPreviousMemberEpoch(10)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -329,7 +416,8 @@ public class CurrentAssignmentBuilderTest {
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
- .withTargetAssignment(12, new Assignment(mkAssignment(
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(targetAssignmentEpoch, new
Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
@@ -345,8 +433,9 @@ public class CurrentAssignmentBuilderTest {
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
- .setMemberEpoch(11)
- .setPreviousMemberEpoch(10)
+ .setMemberEpoch(expectedMemberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@@ -360,19 +449,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnrevokedPartitionsToUnreleasedPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@@ -391,6 +489,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -401,19 +500,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToStable() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6))))
@@ -425,6 +533,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(12)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@@ -435,19 +544,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToStableWithNewPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@@ -459,6 +577,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7)))
@@ -469,19 +588,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToUnreleasedPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@@ -493,19 +621,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToUnrevokedPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@@ -517,6 +654,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@@ -530,13 +668,21 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnknownState() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNKNOWN)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@@ -548,6 +694,7 @@ public class CurrentAssignmentBuilderTest {
// When the member is in an unknown state, the member is first to force
// a reset of the client side member state.
assertThrows(FencedMemberEpochException.class, () -> new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@@ -556,6 +703,7 @@ public class CurrentAssignmentBuilderTest {
// Then the member rejoins with no owned partitions.
ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@@ -568,6 +716,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(12)
.setPreviousMemberEpoch(11)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@@ -575,4 +724,355 @@ public class CurrentAssignmentBuilderTest {
updatedMember
);
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "10, 11, 11, false", // When advancing to a new target assignment, the
assignment should
+ "10, 11, 11, true", // always take the subscription into account.
+ "10, 10, 10, true",
+ })
+ public void testStableToStableWithAssignmentTopicsNoLongerInSubscription(
+ int memberEpoch,
+ int targetAssignmentEpoch,
+ int expectedMemberEpoch,
+ boolean hasSubscriptionChanged
+ ) {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(targetAssignmentEpoch, new
Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6))))
+ .withHasSubscriptionChanged(hasSubscriptionChanged)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(4, 5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(expectedMemberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "10, 11, 10, false", // When advancing to a new target assignment, the
assignment should always
+ "10, 11, 10, true", // take the subscription into account.
+ "10, 10, 10, true"
+ })
+ public void
testStableToUnrevokedPartitionsWithAssignmentTopicsNoLongerInSubscription(
+ int memberEpoch,
+ int targetAssignmentEpoch,
+ int expectedMemberEpoch,
+ boolean hasSubscriptionChanged
+ ) {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(targetAssignmentEpoch, new
Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6))))
+ .withHasSubscriptionChanged(hasSubscriptionChanged)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(Arrays.asList(1, 2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(4, 5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(expectedMemberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @Test
+ public void
testRemainsInUnrevokedPartitionsWithAssignmentTopicsNoLongerInSubscription() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 2, 3),
+ mkTopicAssignment(topicId2, 5, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1),
+ mkTopicAssignment(topicId2, 4)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(12, new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 3, 4),
+ mkTopicAssignment(topicId2, 6, 7))))
+ .withHasSubscriptionChanged(true)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(Arrays.asList(1, 2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(4, 5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId2, 5, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @Test
+ public void testSubscribedTopicNameAndUnresolvedRegularExpression() {
+ String fooTopic = "foo";
+ String barTopic = "bar";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopic, 10)
+ .addTopic(barTopicId, barTopic, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(fooTopic))
+ .setSubscribedTopicRegex("bar*")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(10, new Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6))))
+ .withHasSubscriptionChanged(true)
+ .withResolvedRegularExpressions(Map.of())
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(1, 2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(Arrays.asList(4, 5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(fooTopic))
+ .setSubscribedTopicRegex("bar*")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @Test
+ public void testUnresolvedRegularExpression() {
+ String fooTopic = "foo";
+ String barTopic = "bar";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopic, 10)
+ .addTopic(barTopicId, barTopic, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of())
+ .setSubscribedTopicRegex("bar*")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(10, new Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6))))
+ .withHasSubscriptionChanged(true)
+ .withResolvedRegularExpressions(Map.of())
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(1, 2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(Arrays.asList(4, 5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of())
+ .setSubscribedTopicRegex("bar*")
+ .setAssignedPartitions(mkAssignment())
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @Test
+ public void testSubscribedTopicNameAndResolvedRegularExpression() {
+ String fooTopic = "foo";
+ String barTopic = "bar";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopic, 10)
+ .addTopic(barTopicId, barTopic, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(fooTopic))
+ .setSubscribedTopicRegex("bar*")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(10, new Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6))))
+ .withHasSubscriptionChanged(true)
+ .withResolvedRegularExpressions(Map.of(
+ "bar*", new ResolvedRegularExpression(
+ Set.of("bar"),
+ 12345L,
+ 0L
+ )
+ ))
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(1, 2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(Arrays.asList(4, 5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(fooTopic))
+ .setSubscribedTopicRegex("bar*")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .build(),
+ updatedMember
+ );
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java
index f3b6f4604e6..6edff7fc08e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilderTest.java
@@ -17,10 +17,16 @@
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.List;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@@ -30,19 +36,28 @@ public class ShareGroupAssignmentBuilderTest {
@Test
public void testStableToStable() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ShareGroupMember member = new ShareGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ShareGroupMember updatedMember = new
ShareGroupAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
@@ -53,6 +68,7 @@ public class ShareGroupAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
@@ -63,19 +79,28 @@ public class ShareGroupAssignmentBuilderTest {
@Test
public void testStableToStableWithNewPartitions() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
ShareGroupMember member = new ShareGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ShareGroupMember updatedMember = new
ShareGroupAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7))))
@@ -86,6 +111,7 @@ public class ShareGroupAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7)))
@@ -93,4 +119,56 @@ public class ShareGroupAssignmentBuilderTest {
updatedMember
);
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "10, 11, false", // When advancing to a new target assignment, the
assignment should always
+ "10, 11, true", // take the subscription into account.
+ "10, 10, true"
+ })
+ public void testStableToStableWithAssignmentTopicsNoLongerInSubscription(
+ int memberEpoch,
+ int targetAssignmentEpoch,
+ boolean hasSubscriptionChanged
+ ) {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
+ ShareGroupMember member = new ShareGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build();
+
+ ShareGroupMember updatedMember = new
ShareGroupAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(targetAssignmentEpoch, new
Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3),
+ mkTopicAssignment(topicId2, 4, 5, 6))))
+ .withHasSubscriptionChanged(hasSubscriptionChanged)
+ .build();
+
+ assertEquals(
+ new ShareGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(targetAssignmentEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId2, 4, 5, 6)))
+ .build(),
+ updatedMember
+ );
+ }
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java
new file mode 100644
index 00000000000..05ff6c311d7
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import org.apache.kafka.coordinator.group.modern.Assignment;
+import org.apache.kafka.coordinator.group.modern.MemberState;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import
org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class CurrentAssignmentBuilderBenchmark {
+
+ @Param({"5", "50"})
+ private int partitionsPerTopic;
+
+ @Param({"10", "100", "1000"})
+ private int topicCount;
+
+ private List<String> topicNames;
+
+ private List<Uuid> topicIds;
+
+ private CoordinatorMetadataImage metadataImage;
+
+ private ConsumerGroupMember member;
+
+ private ConsumerGroupMember memberWithUnsubscribedTopics;
+
+ private Assignment targetAssignment;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ setupTopics();
+ setupMember();
+ setupTargetAssignment();
+ }
+
+ private void setupTopics() {
+ topicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
+ topicIds = new ArrayList<>(topicCount);
+ metadataImage = AssignorBenchmarkUtils.createMetadataImage(topicNames,
partitionsPerTopic);
+
+ for (String topicName : topicNames) {
+ Uuid topicId = metadataImage.topicMetadata(topicName).get().id();
+ topicIds.add(topicId);
+ }
+ }
+
+ private void setupMember() {
+ Map<Uuid, Set<Integer>> assignedPartitions = new HashMap<>();
+ for (Uuid topicId : topicIds) {
+ Set<Integer> partitions = IntStream.range(0, partitionsPerTopic)
+ .boxed()
+ .collect(Collectors.toSet());
+ assignedPartitions.put(topicId, partitions);
+ }
+
+ ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(topicNames)
+ .setAssignedPartitions(assignedPartitions);
+
+ member = memberBuilder.build();
+ memberWithUnsubscribedTopics = memberBuilder
+ .setSubscribedTopicNames(topicNames.subList(0, topicNames.size() -
1))
+ .build();
+ }
+
+ private void setupTargetAssignment() {
+ Map<Uuid, Set<Integer>> assignedPartitions = new HashMap<>();
+ for (Uuid topicId : topicIds) {
+ Set<Integer> partitions = IntStream.range(0, partitionsPerTopic)
+ .boxed()
+ .collect(Collectors.toSet());
+ assignedPartitions.put(topicId, partitions);
+ }
+ targetAssignment = new Assignment(assignedPartitions);
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public ConsumerGroupMember stableToStableWithNoChange() {
+ return new CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(member.memberEpoch(), targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public ConsumerGroupMember stableToStableWithNewTargetAssignment() {
+ return new CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(member.memberEpoch() + 1, targetAssignment)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public ConsumerGroupMember stableToStableWithSubscriptionChange() {
+ return new CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(member.memberEpoch(), targetAssignment)
+ .withHasSubscriptionChanged(true)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public ConsumerGroupMember
stableToUnrevokedPartitionsWithSubscriptionChange() {
+ return new CurrentAssignmentBuilder(memberWithUnsubscribedTopics)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(memberWithUnsubscribedTopics.memberEpoch(),
targetAssignment)
+ .withHasSubscriptionChanged(true)
+ .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
+ .build();
+ }
+}