This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bb1c4465c92 KAFKA-14516: [1/N] Static Member leave, join, re-join
request using ConsumerGroupHeartbeats (#14432)
bb1c4465c92 is described below
commit bb1c4465c92d890fdb6fa2ebb2c2d2f1b8648feb
Author: vamossagar12 <[email protected]>
AuthorDate: Tue Nov 28 23:38:16 2023 +0530
KAFKA-14516: [1/N] Static Member leave, join, re-join request using
ConsumerGroupHeartbeats (#14432)
This patch add the support for static membership to the new consumer group
protocol. With a static member can join, re-join, temporarily leave and leave.
When a member leaves with the expectation to rejoin, it must rejoin within the
session timeout. It is kicks out from the consumer group otherwise.
Reviewers: David Jacot <[email protected]>
---
.../requests/ConsumerGroupHeartbeatRequest.java | 1 +
.../coordinator/group/GroupMetadataManager.java | 255 +++-
.../coordinator/group/consumer/ConsumerGroup.java | 61 +
.../group/consumer/TargetAssignmentBuilder.java | 28 +-
.../group/GroupMetadataManagerTest.java | 1252 +++++++++++++++++---
.../group/consumer/ConsumerGroupTest.java | 50 +
.../consumer/TargetAssignmentBuilderTest.java | 120 +-
7 files changed, 1568 insertions(+), 199 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index ac3a079641d..ced4880adf6 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -30,6 +30,7 @@ public class ConsumerGroupHeartbeatRequest extends
AbstractRequest {
* A member epoch of <code>-1</code> means that the member wants to leave
the group.
*/
public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
+ public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2;
/**
* A member epoch of <code>0</code> means that the member wants to join
the group.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9116d9b6017..eafb6bb2170 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
@@ -104,6 +106,7 @@ import static
org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
import static org.apache.kafka.coordinator.group.Group.GroupType.GENERIC;
@@ -663,6 +666,22 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Throws an InvalidRequestException if the value is null.
+ *
+ * @param value The value.
+ * @param error The error message.
+ * @throws InvalidRequestException
+ */
+ private void throwIfNull(
+ Object value,
+ String error
+ ) throws InvalidRequestException {
+ if (value == null) {
+ throw new InvalidRequestException(error);
+ }
+ }
+
/**
* Validates the request.
*
@@ -691,6 +710,9 @@ public class GroupMetadataManager {
if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
}
+ } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+ throwIfNull(request.instanceId(), "InstanceId can't be null.");
} else {
throw new InvalidRequestException("MemberEpoch is invalid.");
}
@@ -781,6 +803,59 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Validates if the received instanceId has been released from the group
+ *
+ * @param member The consumer group member.
+ * @param groupId The consumer group id.
+ * @param receivedMemberId The member id received in the request.
+ * @param receivedInstanceId The instance id received in the request.
+ *
+ * @throws UnreleasedInstanceIdException if the instance id received in
the request is still in use by an existing static member.
+ */
+ private void throwIfInstanceIdIsUnreleased(ConsumerGroupMember member,
String groupId, String receivedMemberId, String receivedInstanceId) {
+ if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ // The new member can't join.
+ log.info("[GroupId {}] Static member {} with instance id {} cannot
join the group because the instance id is" +
+ " is owned by member {}.", groupId, receivedMemberId,
receivedInstanceId, member.memberId());
+ throw Errors.UNRELEASED_INSTANCE_ID.exception("Static member " +
receivedMemberId + " with instance id "
+ + receivedInstanceId + " cannot join the group because the
instance id is owned by " + member.memberId() + " member.");
+ }
+ }
+
+ /**
+ * Validates if the received instanceId has been released from the group
+ *
+ * @param member The consumer group member.
+ * @param groupId The consumer group id.
+ * @param receivedMemberId The member id received in the request.
+ * @param receivedInstanceId The instance id received in the request.
+ *
+ * @throws FencedInstanceIdException if the instance id provided is fenced
because of another static member.
+ */
+ private void throwIfInstanceIdIsFenced(ConsumerGroupMember member, String
groupId, String receivedMemberId, String receivedInstanceId) {
+ if (!member.memberId().equals(receivedMemberId)) {
+ log.info("[GroupId {}] Static member {} with instance id {} is
fenced by existing member {}.",
+ groupId, receivedMemberId, receivedInstanceId,
member.memberId());
+ throw Errors.FENCED_INSTANCE_ID.exception("Static member " +
receivedMemberId + " with instance id "
+ + receivedInstanceId + " was fenced by member " +
member.memberId() + ".");
+ }
+ }
+
+ /**
+ * Validates if the received instanceId has been released from the group
+ *
+ * @param staticMember The static member in the group.
+ * @param receivedInstanceId The instance id received in the request.
+ *
+ * @throws UnknownMemberIdException if no static member exists in the
group against the provided instance id.
+ */
+ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember
staticMember, String receivedInstanceId) {
+ if (staticMember == null) {
+ throw Errors.UNKNOWN_MEMBER_ID.exception("Instance id " +
receivedInstanceId + " is unknown.");
+ }
+ }
+
private ConsumerGroupHeartbeatResponseData.Assignment
createResponseAssignment(
ConsumerGroupMember member
) {
@@ -849,21 +924,54 @@ public class GroupMetadataManager {
// Get or create the member.
if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
- final ConsumerGroupMember member =
group.getOrMaybeCreateMember(memberId, createIfNotExists);
- throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
- if (memberEpoch == 0) {
- log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+ throwIfMemberEpochIsInvalid(member, memberEpoch,
ownedTopicPartitions);
+ if (createIfNotExists) {
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ }
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ if (memberEpoch == 0) {
+ // A new static member joins or the existing static member
rejoins.
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId,
createIfNotExists);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Static member rejoins with a different member id so it
should replace
+ // the previous instance iff the previous member had sent
a leave group.
+ throwIfInstanceIdIsUnreleased(member, groupId, memberId,
instanceId);
+ // Replace the current member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMemberAndCancelTimers(records, group.groupId(),
member.memberId());
+ log.info("[GroupId {}] Static member {} with instance id
{} re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ } else {
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ throwIfMemberEpochIsInvalid(member, memberEpoch,
ownedTopicPartitions);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ }
}
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+
// 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
// record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
// changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
// record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
// changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
- int groupEpoch = group.groupEpoch();
- Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
- ConsumerGroupMember updatedMember = new
ConsumerGroupMember.Builder(member)
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
.maybeUpdateInstanceId(Optional.ofNullable(instanceId))
.maybeUpdateRackId(Optional.ofNullable(rackId))
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
@@ -919,25 +1027,35 @@ public class GroupMetadataManager {
group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
- // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The
- // delta between the existing and the new target assignment is
persisted to the partition.
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
- if (groupEpoch > targetAssignmentEpoch) {
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
String preferredServerAssignor =
group.computePreferredServerAssignor(
member,
updatedMember
).orElse(defaultAssignor.name());
-
try {
- TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult =
+ TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
.withMembers(group.members())
+ .withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(group.targetAssignment())
- .addOrUpdateMember(memberId, updatedMember)
+ .addOrUpdateMember(memberId, updatedMember);
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member should
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced) {
+ assignmentResult = assignmentResultBuilder
+ .removeMember(member.memberId())
.build();
-
+ } else {
+ assignmentResult = assignmentResultBuilder
+ .build();
+ }
log.info("[GroupId {}] Computed a new target assignment for
epoch {}: {}.",
groupId, groupEpoch, assignmentResult.targetAssignment());
@@ -1004,27 +1122,79 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records, response);
}
+ private void removeMemberAndCancelTimers(
+ List<Record> records,
+ String groupId,
+ String memberId
+ ) {
+ // Write tombstones for the departed static member.
+ removeMember(records, groupId, memberId);
+ // Cancel all the timers of the departed static member.
+ cancelTimers(groupId, memberId);
+ }
+
/**
* Handles leave request from a consumer group member.
* @param groupId The group id from the request.
* @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
*
* @return A Result containing the ConsumerGroupHeartbeat response and
* a list of records to update the state machine.
*/
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record>
consumerGroupLeave(
String groupId,
- String memberId
+ String instanceId,
+ String memberId,
+ int memberEpoch
) throws ApiException {
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
- ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId,
false);
-
- log.info("[GroupId " + groupId + "] Member " + memberId + " left the
consumer group.");
-
- List<Record> records = consumerGroupFenceMember(group, member);
+ List<Record> records;
+ if (instanceId == null) {
+ ConsumerGroupMember member =
group.getOrMaybeCreateMember(memberId, false);
+ log.info("[GroupId {}] Member {} left the consumer group.",
groupId, memberId);
+ records = consumerGroupFenceMember(group, member);
+ } else {
+ ConsumerGroupMember member = group.staticMember(instanceId);
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+ if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ log.info("[GroupId {}] Static Member {} with instance id {}
temporarily left the consumer group.",
+ group.groupId(), memberId, instanceId);
+ records = consumerGroupStaticMemberGroupLeave(group, member);
+ } else {
+ log.info("[GroupId {}] Static Member {} with instance id {}
left the consumer group.",
+ group.groupId(), memberId, instanceId);
+ records = consumerGroupFenceMember(group, member);
+ }
+ }
return new CoordinatorResult<>(records, new
ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+ .setMemberEpoch(memberEpoch));
+ }
+
+ /**
+ * Handles the case when a static member decides to leave the group.
+ * The member is not actually fenced from the group, and instead it's
+ * member epoch is updated to -2 to reflect that a member using the given
+ * instance id decided to leave the group and would be back within session
+ * timeout.
+ *
+ * @param group The group.
+ * @param member The static member in the group for the instance id.
+ *
+ * @return A list with a single record signifying that the static member
is leaving.
+ */
+ private List<Record> consumerGroupStaticMemberGroupLeave(
+ ConsumerGroup group,
+ ConsumerGroupMember member
+ ) {
+ // We will write a member epoch of -2 for this departing static member.
+ ConsumerGroupMember leavingStaticMember = new
ConsumerGroupMember.Builder(member)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setPartitionsPendingRevocation(Collections.emptyMap())
+ .build();
+ return
Collections.singletonList(newCurrentAssignmentRecord(group.groupId(),
leavingStaticMember));
}
/**
@@ -1041,10 +1211,7 @@ public class GroupMetadataManager {
) {
List<Record> records = new ArrayList<>();
- // Write tombstones for the member. The order matters here.
- records.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
member.memberId()));
- records.add(newTargetAssignmentTombstoneRecord(group.groupId(),
member.memberId()));
- records.add(newMemberSubscriptionTombstoneRecord(group.groupId(),
member.memberId()));
+ removeMember(records, group.groupId(), member.memberId());
// We update the subscription metadata without the leaving member.
Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
@@ -1064,13 +1231,35 @@ public class GroupMetadataManager {
int groupEpoch = group.groupEpoch() + 1;
records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
- // Cancel all the timers of the member.
- cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId());
- cancelConsumerGroupRevocationTimeout(group.groupId(),
member.memberId());
+ cancelTimers(group.groupId(), member.memberId());
return records;
}
+ /**
+ * Write tombstones for the member. The order matters here.
+ *
+ * @param records The list of records to append the member
assignment tombstone records.
+ * @param groupId The group id.
+ * @param memberId The member id.
+ */
+ private void removeMember(List<Record> records, String groupId, String
memberId) {
+ records.add(newCurrentAssignmentTombstoneRecord(groupId, memberId));
+ records.add(newTargetAssignmentTombstoneRecord(groupId, memberId));
+ records.add(newMemberSubscriptionTombstoneRecord(groupId, memberId));
+ }
+
+ /**
+ * Cancel all the timers of the member.
+ *
+ * @param groupId The group id.
+ * @param memberId The member id.
+ */
+ private void cancelTimers(String groupId, String memberId) {
+ cancelConsumerGroupSessionTimeout(groupId, memberId);
+ cancelConsumerGroupRevocationTimeout(groupId, memberId);
+ }
+
/**
* Schedules (or reschedules) the session timeout for the member.
*
@@ -1184,10 +1373,14 @@ public class GroupMetadataManager {
) throws ApiException {
throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
- if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
+ if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH ||
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ // -1 means that the member wants to leave the group.
+ // -2 means that a static member wants to leave the group.
return consumerGroupLeave(
request.groupId(),
- request.memberId()
+ request.instanceId(),
+ request.memberId(),
+ request.memberEpoch()
);
} else {
// Otherwise, it is a regular heartbeat.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index f3c22837472..bad716d82ff 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -115,6 +115,11 @@ public class ConsumerGroup implements Group {
*/
private final TimelineHashMap<String, ConsumerGroupMember> members;
+ /**
+ * The static group members.
+ */
+ private final TimelineHashMap<String, String> staticMembers;
+
/**
* The number of members supporting each server assignor name.
*/
@@ -176,6 +181,7 @@ public class ConsumerGroup implements Group {
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.groupEpoch = new TimelineInteger(snapshotRegistry);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry,
0);
@@ -276,6 +282,18 @@ public class ConsumerGroup implements Group {
maybeUpdateGroupState();
}
+ /**
+ * Get member id of a static member that matches the given group
+ * instance id.
+ *
+ * @param groupInstanceId The group instance id.
+ *
+ * @return The member id corresponding to the given instance id or null if
it does not exist
+ */
+ public String staticMemberId(String groupInstanceId) {
+ return staticMembers.get(groupInstanceId);
+ }
+
/**
* Gets or creates a member.
*
@@ -302,6 +320,18 @@ public class ConsumerGroup implements Group {
return member;
}
+ /**
+ * Gets a static member.
+ *
+ * @param instanceId The group instance id.
+ *
+ * @return The member corresponding to the given instance id or null if it
does not exist
+ */
+ public ConsumerGroupMember staticMember(String instanceId) {
+ String existingMemberId = staticMemberId(instanceId);
+ return existingMemberId == null ? null :
getOrMaybeCreateMember(existingMemberId, false);
+ }
+
/**
* Updates the member.
*
@@ -315,9 +345,21 @@ public class ConsumerGroup implements Group {
maybeUpdateSubscribedTopicNames(oldMember, newMember);
maybeUpdateServerAssignors(oldMember, newMember);
maybeUpdatePartitionEpoch(oldMember, newMember);
+ updateStaticMember(newMember);
maybeUpdateGroupState();
}
+ /**
+ * Updates the member id stored against the instance id if the member is a
static member.
+ *
+ * @param newMember The new member state.
+ */
+ private void updateStaticMember(ConsumerGroupMember newMember) {
+ if (newMember.instanceId() != null) {
+ staticMembers.put(newMember.instanceId(), newMember.memberId());
+ }
+ }
+
/**
* Remove the member from the group.
*
@@ -328,9 +370,21 @@ public class ConsumerGroup implements Group {
maybeUpdateSubscribedTopicNames(oldMember, null);
maybeUpdateServerAssignors(oldMember, null);
maybeRemovePartitionEpoch(oldMember);
+ removeStaticMember(oldMember);
maybeUpdateGroupState();
}
+ /**
+ * Remove the static member mapping if the removed member is static.
+ *
+ * @param oldMember The member to remove.
+ */
+ private void removeStaticMember(ConsumerGroupMember oldMember) {
+ if (oldMember.instanceId() != null) {
+ staticMembers.remove(oldMember.instanceId());
+ }
+ }
+
/**
* Returns true if the member exists.
*
@@ -356,6 +410,13 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableMap(members);
}
+ /**
+ * @return An immutable Map containing all the static members keyed by
instance id.
+ */
+ public Map<String, String> staticMembers() {
+ return Collections.unmodifiableMap(staticMembers);
+ }
+
/**
* @return An immutable Set containing all the subscribed topic names.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index 2026efaa82c..b7d95fe2102 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -127,6 +127,11 @@ public class TargetAssignmentBuilder {
*/
private final Map<String, ConsumerGroupMember> updatedMembers = new
HashMap<>();
+ /**
+ * The static members in the group.
+ */
+ private Map<String, String> staticMembers = new HashMap<>();
+
/**
* Constructs the object.
*
@@ -157,6 +162,19 @@ public class TargetAssignmentBuilder {
return this;
}
+ /**
+ * Adds all the existing static members.
+ *
+ * @param staticMembers The existing static members in the consumer
group.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withStaticMembers(
+ Map<String, String> staticMembers
+ ) {
+ this.staticMembers = staticMembers;
+ return this;
+ }
+
/**
* Adds the subscription metadata to use.
*
@@ -234,9 +252,17 @@ public class TargetAssignmentBuilder {
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
+ ConsumerGroupMember member = members.get(memberId);
+ Assignment assignment;
+ // A new static member joins and needs to replace an existing
departed one.
+ if (member == null &&
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+ assignment =
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
Assignment.EMPTY);
+ } else {
+ assignment = targetAssignment.getOrDefault(memberId,
Assignment.EMPTY);
+ }
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
- targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+ assignment,
subscriptionMetadata
));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2dca0c2e352..6711619eb09 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -31,6 +31,7 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
@@ -123,6 +124,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -1313,6 +1315,17 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(1)
.setServerAssignor("bar")));
assertEquals("ServerAssignor bar is not supported. Supported
assignors: range.", ex.getMessage());
+
+ ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList())));
+
+ assertEquals("InstanceId can't be null.", ex.getMessage());
}
@Test
@@ -1686,172 +1699,1045 @@ public class GroupMetadataManagerTest {
}
@Test
- public void testNewJoiningMemberTriggersNewTargetAssignment() {
+ public void testNewJoiningMemberTriggersNewTargetAssignment() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String memberId3 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId1, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )));
+ put(memberId2, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )));
+ put(memberId3, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )));
+ }
+ }
+ ));
+
+ // Member 3 joins the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId3)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignor("range")
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId3)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember3 = new
ConsumerGroupMember.Builder(memberId3)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(0)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newMemberSubscriptionRecord(groupId,
expectedMember3),
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId1,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId2,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId3,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+ );
+
+ assertRecordsEquals(expectedRecords.subList(0, 3),
result.records().subList(0, 3));
+ assertUnorderedListEquals(expectedRecords.subList(3, 6),
result.records().subList(3, 6));
+ assertRecordsEquals(expectedRecords.subList(6, 8),
result.records().subList(6, 8));
+ }
+
+ @Test
+ public void testLeavingMemberBumpsGroupEpoch() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+ Uuid zarTopicId = Uuid.randomUuid();
+ String zarTopicName = "zar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with two members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addTopic(zarTopicId, zarTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ // Use zar only here to ensure that metadata needs to be
recomputed.
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar",
"zar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 2 leaves the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+ result.response()
+ );
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newTargetAssignmentTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId2),
+ // Subscription metadata is recomputed because zar is no longer
there.
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String memberId3 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with two static members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ // Use zar only here to ensure that metadata needs to be
recomputed.
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar",
"zar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId1, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )));
+ put(memberId2, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )));
+ put(memberId3, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )));
+ }
+ }
+ ));
+
+ // Member 3 joins the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId3)
+ .setInstanceId(memberId3)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId3)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember3 = new
ConsumerGroupMember.Builder(memberId3)
+ .setMemberEpoch(11)
+ .setInstanceId(memberId3)
+ .setPreviousMemberEpoch(0)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newMemberSubscriptionRecord(groupId,
expectedMember3),
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId1,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId2,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId3,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+ );
+
+ assertRecordsEquals(expectedRecords.subList(0, 3),
result.records().subList(0, 3));
+ assertUnorderedListEquals(expectedRecords.subList(3, 6),
result.records().subList(3, 6));
+ assertRecordsEquals(expectedRecords.subList(6, 8),
result.records().subList(6, 8));
+ }
+
+ @Test
+ public void testStaticMemberGetsBackAssignmentUponRejoin() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String member2RejoinId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ // Consumer group with two static members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }))
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId1, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)
+ )));
+ // When the member rejoins, it gets the same assignments.
+ put(member2RejoinId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )));
+ }
+ }
+ ));
+
+ // Member 2 leaves the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(-2)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ // Member epoch of the response would be set to -2.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(-2),
+ result.response()
+ );
+
+ // The departing static member will have it's epoch set to -2.
+ ConsumerGroupMember member2UpdatedEpoch = new
ConsumerGroupMember.Builder(member2)
+ .setMemberEpoch(-2)
+ .build();
+
+ List<Record> expectedRecords = Collections.singletonList(
+ RecordHelpers.newCurrentAssignmentRecord(groupId,
member2UpdatedEpoch)
+ );
+
+ assertEquals(result.records(), expectedRecords);
+
+ // Member 2 rejoins the group with the same instance id.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record>
rejoinResult = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(member2RejoinId)
+ .setGroupId(groupId)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(member2RejoinId)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(Arrays.asList(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(3, 4, 5)),
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(Collections.singletonList(2))
+ ))),
+ rejoinResult.response()
+ );
+
+ ConsumerGroupMember expectedRejoinedMember = new
ConsumerGroupMember.Builder(member2RejoinId)
+ .setMemberEpoch(10)
+ .setInstanceId(memberId2)
+ .setPreviousMemberEpoch(0)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ List<Record> expectedRecordsAfterRejoin = Arrays.asList(
+ RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newTargetAssignmentTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newMemberSubscriptionRecord(groupId,
expectedRejoinedMember),
+ RecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2))),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
+ RecordHelpers.newCurrentAssignmentRecord(groupId,
expectedRejoinedMember)
+ );
+
+ assertRecordsEquals(expectedRecordsAfterRejoin,
rejoinResult.records());
+ // Verify that there are no timers.
+ context.assertNoSessionTimeout(groupId, memberId2);
+ context.assertNoRevocationTimeout(groupId, memberId2);
+ }
+
+ @Test
+ public void testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ // Use zar only here to ensure that metadata needs to be
recomputed.
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ // Consumer group with two static members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 2 leaves the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ // member epoch of the response would be set to -2
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH),
+ result.response()
+ );
+
+ ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember
+ .Builder(member2)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .build();
+ List<Record> expectedRecords = Collections.singletonList(
+ RecordHelpers.newCurrentAssignmentRecord(groupId,
member2UpdatedEpoch)
+ );
+
+ assertEquals(result.records(), expectedRecords);
+ }
+
+ @Test
+ public void testLeavingStaticMemberBumpsGroupEpoch() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+ Uuid zarTopicId = Uuid.randomUuid();
+ String zarTopicName = "zar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with two static members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addTopic(zarTopicId, zarTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ // Use zar only here to ensure that metadata needs to be
recomputed.
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar",
"zar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 2 leaves the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setInstanceId(memberId2)
+ .setMemberId(memberId2)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+ result.response()
+ );
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newTargetAssignmentTombstoneRecord(groupId,
memberId2),
+ RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId2),
+ // Subscription metadata is recomputed because zar is no longer
there.
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void
testShouldThrownUnreleasedInstanceIdExceptionWhenNewMemberJoinsWithInUseInstanceId()
{
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with one static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 2 joins the consumer group with an in-use instance id.
+ assertThrows(UnreleasedInstanceIdException.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList())));
+ }
+
+ @Test
+ public void
testShouldThrownUnknownMemberIdExceptionWhenUnknownStaticMemberJoins() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with one static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 2 joins the consumer group with a non-zero epoch
+ assertThrows(UnknownMemberIdException.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList())));
+ }
+
+ @Test
+ public void
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdJoins()
{
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with one static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ assertThrows(FencedInstanceIdException.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId("unknown-" + memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(11)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList())));
+ }
+
+ @Test
+ public void testConsumerGroupMemberEpochValidationForStaticMember() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .build();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setTargetMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1, 2, 3)))
+ .build();
+
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId,
member));
+
+ context.replay(RecordHelpers.newGroupEpochRecord(groupId, 100));
+
+ context.replay(RecordHelpers.newTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3)
+ )));
+
+ context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId,
100));
+
+ context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId,
member));
+
+ // Member epoch is greater than the expected epoch.
+ assertThrows(FencedMemberEpochException.class, () ->
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(200)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+ // Member epoch is smaller than the expected epoch.
+ assertThrows(FencedMemberEpochException.class, () ->
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(50)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+ // Member joins with previous epoch but without providing partitions.
+ assertThrows(FencedMemberEpochException.class, () ->
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+ // Member joins with previous epoch and has a subset of the owned
partitions. This
+ // is accepted as the response with the bumped epoch may have been
lost. In this
+ // case, we provide back the correct epoch to the member.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.singletonList(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(1, 2)))));
+ assertEquals(100, result.response().memberEpoch());
+ }
+
+ @Test
+ public void
testShouldThrowUnknownMemberIdExceptionWhenUnknownStaticMemberLeaves() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
- String memberId2 = Uuid.randomUuid().toString();
- String memberId3 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
- Uuid barTopicId = Uuid.randomUuid();
- String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with one static member.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setTargetMemberEpoch(10)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
- .setServerAssignorName("range")
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2),
- mkTopicAssignment(barTopicId, 0, 1)))
- .build())
- .withMember(new ConsumerGroupMember.Builder(memberId2)
- .setMemberEpoch(10)
- .setPreviousMemberEpoch(9)
- .setTargetMemberEpoch(10)
- .setClientId("client")
- .setClientHost("localhost/127.0.0.1")
- .setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5),
- mkTopicAssignment(barTopicId, 2)))
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
.build())
.withAssignment(memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2),
- mkTopicAssignment(barTopicId, 0, 1)))
- .withAssignment(memberId2, mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5),
- mkTopicAssignment(barTopicId, 2)))
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignmentEpoch(10))
.build();
- assignor.prepareGroupAssignment(new GroupAssignment(
- new HashMap<String, MemberAssignment>() {
- {
- put(memberId1, new MemberAssignment(mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1),
- mkTopicAssignment(barTopicId, 0)
- )));
- put(memberId2, new MemberAssignment(mkAssignment(
- mkTopicAssignment(fooTopicId, 2, 3),
- mkTopicAssignment(barTopicId, 1)
- )));
- put(memberId3, new MemberAssignment(mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5),
- mkTopicAssignment(barTopicId, 2)
- )));
- }
- }
- ));
-
- // Member 3 joins the consumer group.
- CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ assertThrows(UnknownMemberIdException.class, () ->
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
- .setMemberId(memberId3)
- .setMemberEpoch(0)
+ .setMemberId(memberId1)
+ .setInstanceId("unknown-" + memberId1)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
- .setServerAssignor("range")
- .setTopicPartitions(Collections.emptyList()));
-
- assertResponseEquals(
- new ConsumerGroupHeartbeatResponseData()
- .setMemberId(memberId3)
- .setMemberEpoch(11)
- .setHeartbeatIntervalMs(5000)
- .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
- result.response()
- );
-
- ConsumerGroupMember expectedMember3 = new
ConsumerGroupMember.Builder(memberId3)
- .setMemberEpoch(11)
- .setPreviousMemberEpoch(0)
- .setTargetMemberEpoch(11)
- .setClientId("client")
- .setClientHost("localhost/127.0.0.1")
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
- .setServerAssignorName("range")
- .setPartitionsPendingAssignment(mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5),
- mkTopicAssignment(barTopicId, 2)))
- .build();
-
- List<Record> expectedRecords = Arrays.asList(
- RecordHelpers.newMemberSubscriptionRecord(groupId,
expectedMember3),
- RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
- {
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
- }
- }),
- RecordHelpers.newGroupEpochRecord(groupId, 11),
- RecordHelpers.newTargetAssignmentRecord(groupId, memberId1,
mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1),
- mkTopicAssignment(barTopicId, 0)
- )),
- RecordHelpers.newTargetAssignmentRecord(groupId, memberId2,
mkAssignment(
- mkTopicAssignment(fooTopicId, 2, 3),
- mkTopicAssignment(barTopicId, 1)
- )),
- RecordHelpers.newTargetAssignmentRecord(groupId, memberId3,
mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5),
- mkTopicAssignment(barTopicId, 2)
- )),
- RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
- RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
- );
-
- assertRecordsEquals(expectedRecords.subList(0, 3),
result.records().subList(0, 3));
- assertUnorderedListEquals(expectedRecords.subList(3, 6),
result.records().subList(3, 6));
- assertRecordsEquals(expectedRecords.subList(6, 8),
result.records().subList(6, 8));
+ .setTopicPartitions(Collections.emptyList())));
}
@Test
- public void testLeavingMemberBumpsGroupEpoch() {
+ public void
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdLeaves()
{
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
- String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
- Uuid barTopicId = Uuid.randomUuid();
- String barTopicName = "bar";
- Uuid zarTopicId = Uuid.randomUuid();
- String zarTopicName = "zar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
- // Consumer group with two members.
+ // Consumer group with one static member.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .addTopic(zarTopicId, zarTopicName, 1)
- .addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setTargetMemberEpoch(10)
@@ -1860,63 +2746,22 @@ public class GroupMetadataManagerTest {
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2),
- mkTopicAssignment(barTopicId, 0, 1)))
- .build())
- .withMember(new ConsumerGroupMember.Builder(memberId2)
- .setMemberEpoch(10)
- .setPreviousMemberEpoch(9)
- .setTargetMemberEpoch(10)
- .setClientId("client")
- .setClientHost("localhost/127.0.0.1")
- // Use zar only here to ensure that metadata needs to be
recomputed.
- .setSubscribedTopicNames(Arrays.asList("foo", "bar",
"zar"))
- .setServerAssignorName("range")
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5),
- mkTopicAssignment(barTopicId, 2)))
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
.build())
.withAssignment(memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2),
- mkTopicAssignment(barTopicId, 0, 1)))
- .withAssignment(memberId2, mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5),
- mkTopicAssignment(barTopicId, 2)))
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignmentEpoch(10))
.build();
- // Member 2 leaves the consumer group.
- CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ assertThrows(FencedInstanceIdException.class, () ->
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
- .setMemberId(memberId2)
- .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setMemberId("unknown-" + memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
- .setTopicPartitions(Collections.emptyList()));
-
- assertResponseEquals(
- new ConsumerGroupHeartbeatResponseData()
- .setMemberId(memberId2)
- .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
- result.response()
- );
-
- List<Record> expectedRecords = Arrays.asList(
- RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId,
memberId2),
- RecordHelpers.newTargetAssignmentTombstoneRecord(groupId,
memberId2),
- RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId2),
- // Subscription metadata is recomputed because zar is no longer
there.
- RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
- {
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
- }
- }),
- RecordHelpers.newGroupEpochRecord(groupId, 11)
- );
-
- assertRecordsEquals(expectedRecords, result.records());
+ .setTopicPartitions(Collections.emptyList())));
}
@Test
@@ -3328,6 +4173,87 @@ public class GroupMetadataManagerTest {
context.assertNoRevocationTimeout(groupId, memberId);
}
+ @Test
+ public void testSessionTimeoutExpirationStaticMember() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build())
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Collections.singletonMap(memberId, new
MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Session timer is scheduled on first heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(90000)
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .setTopicPartitions(Collections.emptyList()));
+ assertEquals(1, result.response().memberEpoch());
+
+ // Verify that there is a session time.
+ context.assertSessionTimeout(groupId, memberId, 45000);
+
+ // Static member sends a temporary leave group request
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(memberId)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(90000)
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertEquals(-2, result.response().memberEpoch());
+
+ // Verify that there is still a session time.
+ context.assertSessionTimeout(groupId, memberId, 45000);
+
+ // Advance time past the session timeout. No static member joined back
as a replacement
+ List<ExpiredTimeout<Void, Record>> timeouts = context.sleep(45000 + 1);
+
+ // Verify the expired timeout.
+ assertEquals(
+ Collections.singletonList(new ExpiredTimeout<Void, Record>(
+ consumerGroupSessionTimeoutKey(groupId, memberId),
+ new CoordinatorResult<>(
+ Arrays.asList(
+
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
+
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
+
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId),
+
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId,
Collections.emptyMap()),
+ RecordHelpers.newGroupEpochRecord(groupId, 2)
+ )
+ )
+ )),
+ timeouts
+ );
+
+ // Verify that there are no timers.
+ context.assertNoSessionTimeout(groupId, memberId);
+ context.assertNoRevocationTimeout(groupId, memberId);
+ }
+
@Test
public void testRevocationTimeoutLifecycle() {
String groupId = "fooup";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index c9379fc2e7b..75016cb6650 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
public class ConsumerGroupTest {
@@ -94,6 +95,34 @@ public class ConsumerGroupTest {
assertEquals(member, consumerGroup.getOrMaybeCreateMember("member",
false));
}
+ @Test
+ public void testNoStaticMember() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+ // Create a new member which is not static
+ consumerGroup.getOrMaybeCreateMember("member", true);
+ assertNull(consumerGroup.staticMember("instance-id"));
+ }
+
+ @Test
+ public void testGetStaticMemberByInstanceId() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ ConsumerGroupMember member;
+
+ member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+ member = new ConsumerGroupMember.Builder(member)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setInstanceId("instance")
+ .build();
+
+ consumerGroup.updateMember(member);
+
+ assertEquals(member, consumerGroup.staticMember("instance"));
+ assertEquals(member, consumerGroup.getOrMaybeCreateMember("member",
false));
+ assertEquals(member.memberId(),
consumerGroup.staticMemberId("instance"));
+ }
+
@Test
public void testRemoveMember() {
ConsumerGroup consumerGroup = createConsumerGroup("foo");
@@ -106,6 +135,27 @@ public class ConsumerGroupTest {
}
+ @Test
+ public void testRemoveStaticMember() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+ ConsumerGroupMember member;
+ member = consumerGroup.getOrMaybeCreateMember("member", true);
+ assertTrue(consumerGroup.hasMember("member"));
+
+ member = new ConsumerGroupMember.Builder(member)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setInstanceId("instance")
+ .build();
+
+ consumerGroup.updateMember(member);
+
+ consumerGroup.removeMember("member");
+ assertFalse(consumerGroup.hasMember("member"));
+ assertNull(consumerGroup.staticMember("instance"));
+ assertNull(consumerGroup.staticMemberId("instance"));
+ }
+
@Test
public void testUpdatingMemberUpdatesPartitionEpoch() {
Uuid fooTopicId = Uuid.randomUuid();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index 810b7617a23..bdd7a354ca4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -57,6 +57,7 @@ public class TargetAssignmentBuilderTest {
private final Map<String, ConsumerGroupMember> updatedMembers = new
HashMap<>();
private final Map<String, Assignment> targetAssignment = new
HashMap<>();
private final Map<String, MemberAssignment> memberAssignments = new
HashMap<>();
+ private final Map<String, String> staticMembers = new HashMap<>();
public TargetAssignmentBuilderTestContext(
String groupId,
@@ -71,10 +72,23 @@ public class TargetAssignmentBuilderTest {
List<String> subscriptions,
Map<Uuid, Set<Integer>> targetPartitions
) {
- members.put(memberId, new ConsumerGroupMember.Builder(memberId)
- .setSubscribedTopicNames(subscriptions)
- .build());
+ addGroupMember(memberId, null, subscriptions, targetPartitions);
+ }
+
+ private void addGroupMember(
+ String memberId,
+ String instanceId,
+ List<String> subscriptions,
+ Map<Uuid, Set<Integer>> targetPartitions
+ ) {
+ ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setSubscribedTopicNames(subscriptions);
+ if (instanceId != null) {
+ memberBuilder.setInstanceId(instanceId);
+ staticMembers.put(instanceId, memberId);
+ }
+ members.put(memberId, memberBuilder.build());
targetAssignment.put(memberId, new Assignment(
(byte) 0,
targetPartitions,
@@ -131,8 +145,18 @@ public class TargetAssignmentBuilderTest {
public void removeMemberSubscription(
String memberId
+ ) {
+ removeMemberSubscription(memberId, null);
+ }
+
+ public void removeMemberSubscription(
+ String memberId,
+ String instanceId
) {
this.updatedMembers.put(memberId, null);
+ if (instanceId != null) {
+ this.staticMembers.remove(instanceId);
+ }
}
public void prepareMemberAssignment(
@@ -161,9 +185,17 @@ public class TargetAssignmentBuilderTest {
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
+ ConsumerGroupMember member = members.get(memberId);
+ Assignment assignment;
+ // A new static member joins and needs to replace an
existing departed one.
+ if (member == null &&
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+ assignment =
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
Assignment.EMPTY);
+ } else {
+ assignment = targetAssignment.getOrDefault(memberId,
Assignment.EMPTY);
+ }
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
- targetAssignment.getOrDefault(memberId,
Assignment.EMPTY),
+ assignment,
subscriptionMetadata
));
}
@@ -189,6 +221,7 @@ public class TargetAssignmentBuilderTest {
// Create and populate the assignment builder.
TargetAssignmentBuilder builder = new
TargetAssignmentBuilder(groupId, groupEpoch, assignor)
.withMembers(members)
+ .withStaticMembers(staticMembers)
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(targetAssignment);
@@ -691,6 +724,85 @@ public class TargetAssignmentBuilderTest {
assertEquals(expectedAssignment, result.targetAssignment());
}
+ @Test
+ public void testStaticMemberReplace() {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
+ Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+
+ context.addGroupMember("member-1", "member-1", Arrays.asList("foo",
"bar", "zar"), mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2)
+ ));
+
+ context.addGroupMember("member-2", "member-2", Arrays.asList("foo",
"bar", "zar"), mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 3, 4)
+ ));
+
+ context.addGroupMember("member-3", "member-3", Arrays.asList("foo",
"bar", "zar"), mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ ));
+
+ // Static member 3 leaves
+ context.removeMemberSubscription("member-3", "member-3");
+
+ // Another static member joins with the same instance id as the
departed one
+ context.updateMemberSubscription("member-3-a", Arrays.asList("foo",
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+ context.prepareMemberAssignment("member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 3, 4)
+ ));
+
+ context.prepareMemberAssignment("member-3-a", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ ));
+
+ TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
+
+ assertEquals(2, result.records().size());
+
+ assertUnorderedList(Collections.singletonList(
+ newTargetAssignmentRecord("my-group", "member-3-a", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ ))
+ ), result.records().subList(0, 1));
+
+ assertEquals(newTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(1));
+
+ Map<String, Assignment> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2)
+ )));
+ expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 3, 4)
+ )));
+
+ expectedAssignment.put("member-3-a", new Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ )));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
private static <T> void assertUnorderedList(
List<T> expected,
List<T> actual