This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e99561e1f30 KAFKA-17593; [10/N] Remove resolved regular expressions
when unsubscribed (#17976)
e99561e1f30 is described below
commit e99561e1f30a30f9b151669a78d340f286a3ead6
Author: David Jacot <[email protected]>
AuthorDate: Thu Dec 5 08:41:37 2024 +0100
KAFKA-17593; [10/N] Remove resolved regular expressions when unsubscribed
(#17976)
This patch does a few things:
1) It cleans up resolved regular expressions when they are unsubscribed
from. It covers the regular leave/fenced paths for the new protocol and it also
covers the LeaveGroup API as new members could be removed via the admin API.
2) It ensures that tombstones for resolved regular expressions are
generated on the conversion patch from consumer to classic group.
3) It fixes
[KAFKA-18116](https://issues.apache.org/jira/browse/KAFKA-18116) because I
faced the same issue while working on the LeaveGroup API. It adds an
integration test for this case too.
Reviewers: Dongnuo Lyu <[email protected]>, Jeff Kim <[email protected]>
---
.../server/GroupCoordinatorBaseRequestTest.scala | 29 +-
.../unit/kafka/server/LeaveGroupRequestTest.scala | 68 +++-
.../coordinator/group/GroupMetadataManager.java | 81 ++++-
.../group/modern/consumer/ConsumerGroup.java | 107 +++++-
.../apache/kafka/coordinator/group/Assertions.java | 33 ++
.../group/GroupMetadataManagerTest.java | 256 +++++++++++++
.../group/GroupMetadataManagerTestContext.java | 4 +
.../group/modern/consumer/ConsumerGroupTest.java | 396 ++++++++++++++++++---
8 files changed, 880 insertions(+), 94 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 3b375c879c4..7db5258b8e4 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -654,7 +654,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
protected def consumerGroupDescribe(
groupIds: List[String],
- includeAuthorizedOperations: Boolean,
+ includeAuthorizedOperations: Boolean = false,
version: Short =
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[ConsumerGroupDescribeResponseData.DescribedGroup] = {
val consumerGroupDescribeRequest = new
ConsumerGroupDescribeRequest.Builder(
@@ -782,14 +782,12 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
)
}
- protected def leaveGroupWithOldProtocol(
+ protected def classicLeaveGroup(
groupId: String,
memberIds: List[String],
groupInstanceIds: List[String] = null,
- expectedLeaveGroupError: Errors,
- expectedMemberErrors: List[Errors],
version: Short = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
- ): Unit = {
+ ): LeaveGroupResponseData = {
val leaveGroupRequest = new LeaveGroupRequest.Builder(
groupId,
List.tabulate(memberIds.length) { i =>
@@ -799,6 +797,24 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
}.asJava
).build(version)
+ connectAndReceive[LeaveGroupResponse](leaveGroupRequest).data
+ }
+
+ protected def leaveGroupWithOldProtocol(
+ groupId: String,
+ memberIds: List[String],
+ groupInstanceIds: List[String] = null,
+ expectedLeaveGroupError: Errors,
+ expectedMemberErrors: List[Errors],
+ version: Short = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
+ ): Unit = {
+ val leaveGroupResponse = classicLeaveGroup(
+ groupId,
+ memberIds,
+ groupInstanceIds,
+ version
+ )
+
val expectedResponseData = new LeaveGroupResponseData()
if (expectedLeaveGroupError != Errors.NONE) {
expectedResponseData.setErrorCode(expectedLeaveGroupError.code)
@@ -812,8 +828,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
}.asJava)
}
- val leaveGroupResponse =
connectAndReceive[LeaveGroupResponse](leaveGroupRequest)
- assertEquals(expectedResponseData, leaveGroupResponse.data)
+ assertEquals(expectedResponseData, leaveGroupResponse)
}
protected def leaveGroup(
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 54164c064e8..84e609fcd92 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -16,6 +16,8 @@
*/
package kafka.server
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.message.LeaveGroupResponseData
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
@@ -23,26 +25,70 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
+import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
+import scala.jdk.CollectionConverters._
+
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(types = Array(Type.KRAFT))
+@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+))
class LeaveGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
- @ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
+ @ClusterTest
+ def testLeaveGroupWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ // Create the topic.
+ createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ for (version <- 3 to
ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
+ val memberId = Uuid.randomUuid().toString
+ assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
+ groupId = "group",
+ memberId = memberId,
+ memberEpoch = 0,
+ instanceId = "instance-id",
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty,
+ ).errorCode)
+
+ assertEquals(
+ new LeaveGroupResponseData()
+ .setMembers(List(
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(memberId)
+ .setGroupInstanceId("instance-id")
+ ).asJava),
+ classicLeaveGroup(
+ groupId = "group",
+ memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID),
+ groupInstanceIds = List("instance-id"),
+ version = version.toShort
+ )
+ )
+
+ assertEquals(
+ ConsumerGroupState.EMPTY.toString,
+ consumerGroupDescribe(List("group")).head.groupState
+ )
+ }
+ }
+
+ @ClusterTest
def testLeaveGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
testLeaveGroup()
}
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
+ @ClusterTest
def testLeaveGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit
= {
testLeaveGroup()
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 6aabc76ec25..95fff558d5e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -163,6 +163,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord;
@@ -2220,7 +2221,8 @@ public class GroupMetadataManager {
.setPreviousMemberEpoch(0)
.build();
- // Generate the records to replace the member.
+ // Generate the records to replace the member. We don't care
about the regular expression
+ // here because it is taken care of later after the static
membership replacement.
replaceMember(records, group, existingStaticMemberOrNull,
newMember);
log.info("[GroupId {}] Static member with instance id {}
re-joins the consumer group " +
@@ -2332,7 +2334,7 @@ public class GroupMetadataManager {
if (isNotEmpty(oldSubscribedTopicRegex) &&
group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
// If the member was the last one subscribed to the regex, we
delete the
// resolved regular expression.
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
+ records.add(newConsumerGroupRegularExpressionTombstone(
groupId,
oldSubscribedTopicRegex
));
@@ -2937,6 +2939,7 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records, response, null, false);
} else {
removeMember(records, group.groupId(), member.memberId());
+ maybeDeleteResolvedRegularExpression(records, group, member);
// We update the subscription metadata without the leaving member.
Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
@@ -2956,6 +2959,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(group.groupId(),
groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.",
group.groupId(), groupEpoch);
cancelTimers(group.groupId(), member.memberId());
@@ -3039,6 +3043,59 @@ public class GroupMetadataManager {
));
}
+ /**
+ * Maybe delete the resolved regular expression associated to the provided
member if
+ * it was the last subscribed member to it.
+ *
+ * @param records The record accumulator.
+ * @param group The group.
+ * @param member The member removed from the group.
+ */
+ private void maybeDeleteResolvedRegularExpression(
+ List<CoordinatorRecord> records,
+ ConsumerGroup group,
+ ConsumerGroupMember member
+ ) {
+ if (isNotEmpty(member.subscribedTopicRegex()) &&
group.numSubscribedMembers(member.subscribedTopicRegex()) == 1) {
+ records.add(newConsumerGroupRegularExpressionTombstone(
+ group.groupId(),
+ member.subscribedTopicRegex()
+ ));
+ }
+ }
+
+ /**
+ * Maybe delete the resolved regular expressions associated with the
provided members
+ * if they were the last ones subscribed to them.
+ *
+ * @param records The record accumulator.
+ * @param group The group.
+ * @param members The member removed from the group.
+ * @return The set of deleted regular expressions.
+ */
+ private Set<String> maybeDeleteResolvedRegularExpressions(
+ List<CoordinatorRecord> records,
+ ConsumerGroup group,
+ Set<ConsumerGroupMember> members
+ ) {
+ Map<String, Integer> counts = new HashMap<>();
+ members.forEach(member -> {
+ if (isNotEmpty(member.subscribedTopicRegex())) {
+ counts.compute(member.subscribedTopicRegex(), Utils::incValue);
+ }
+ });
+
+ Set<String> deletedRegexes = new HashSet<>();
+ counts.forEach((regex, count) -> {
+ if (group.numSubscribedMembers(regex) == count) {
+
records.add(newConsumerGroupRegularExpressionTombstone(group.groupId(), regex));
+ deletedRegexes.add(regex);
+ }
+ });
+
+ return deletedRegexes;
+ }
+
/**
* Write tombstones for the member. The order matters here.
*
@@ -5897,7 +5954,7 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) {
return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
} else if (group.type() == CONSUMER) {
- return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
request);
} else {
throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
}
@@ -5907,14 +5964,12 @@ public class GroupMetadataManager {
* Handle a classic LeaveGroupRequest to a ConsumerGroup.
*
* @param group The ConsumerGroup.
- * @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the records to append.
*/
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeaveToConsumerGroup(
ConsumerGroup group,
- RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
String groupId = group.groupId();
@@ -5933,7 +5988,7 @@ public class GroupMetadataManager {
member = group.getOrMaybeCreateMember(memberId, false);
throwIfMemberDoesNotUseClassicProtocol(member);
- log.info("[Group {}] Dynamic member {} has left group " +
+ log.info("[GroupId {}] Dynamic member {} has left group " +
"through explicit `LeaveGroup` request; client
reason: {}",
groupId, memberId, reason);
} else {
@@ -5943,11 +5998,11 @@ public class GroupMetadataManager {
// in which case we expect the MemberId to be undefined.
if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ throwIfMemberDoesNotUseClassicProtocol(member);
}
- throwIfMemberDoesNotUseClassicProtocol(member);
memberId = member.memberId();
- log.info("[Group {}] Static member {} with instance id {}
has left group " +
+ log.info("[GroupId {}] Static member {} with instance id
{} has left group " +
"through explicit `LeaveGroup` request; client
reason: {}",
groupId, memberId, instanceId, reason);
}
@@ -5971,9 +6026,16 @@ public class GroupMetadataManager {
}
if (!records.isEmpty()) {
+ // Check whether resolved regular expressions could be deleted.
+ Set<String> deletedRegexes = maybeDeleteResolvedRegularExpressions(
+ records,
+ group,
+ validLeaveGroupMembers
+ );
+
// Maybe update the subscription metadata.
Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
- group.computeSubscribedTopicNames(validLeaveGroupMembers),
+
group.computeSubscribedTopicNamesWithoutDeletedMembers(validLeaveGroupMembers,
deletedRegexes),
metadataImage.topics(),
metadataImage.cluster()
);
@@ -5986,6 +6048,7 @@ public class GroupMetadataManager {
// Bump the group epoch.
records.add(newConsumerGroupEpochRecord(groupId,
group.groupEpoch() + 1));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
group.groupEpoch() + 1);
}
return new CoordinatorResult<>(records, new
LeaveGroupResponseData().setMembers(memberResponses));
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index ed05f636189..1fdfe5ae87e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -352,6 +352,69 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+ public Map<String, Integer> computeSubscribedTopicNames(
+ ConsumerGroupMember oldMember,
+ ConsumerGroupMember newMember
+ ) {
+ Map<String, Integer> subscribedTopicsNames =
super.computeSubscribedTopicNames(oldMember, newMember);
+
+ String oldSubscribedTopicRegex = null;
+ if (oldMember != null && oldMember.subscribedTopicRegex() != null &&
!oldMember.subscribedTopicRegex().isEmpty()) {
+ oldSubscribedTopicRegex = oldMember.subscribedTopicRegex();
+ }
+
+ if (oldSubscribedTopicRegex != null) {
+ String newSubscribedTopicRegex = null;
+ if (newMember != null && newMember.subscribedTopicRegex() != null
&& !newMember.subscribedTopicRegex().isEmpty()) {
+ newSubscribedTopicRegex = newMember.subscribedTopicRegex();
+ }
+
+ // If the old member was the last one subscribed to the regex and
the new member
+ // is not subscribed to it, we must remove it from the subscribed
topic names.
+ if (!oldSubscribedTopicRegex.equals(newSubscribedTopicRegex) &&
numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
+
resolvedRegularExpression(oldSubscribedTopicRegex).ifPresent(resolvedRegularExpression
->
+ resolvedRegularExpression.topics.forEach(topic ->
subscribedTopicsNames.compute(topic, Utils::decValue))
+ );
+ }
+ }
+
+ return subscribedTopicsNames;
+ }
+
+ /**
+ * Computes an updated copy of the subscribed topic names without the
provided
+ * removed members and removed regular expressions.
+ *
+ * @param removedMembers The set of removed members.
+ * @param removedRegexes The set of removed regular expressions.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+ public Map<String, Integer>
computeSubscribedTopicNamesWithoutDeletedMembers(
+ Set<ConsumerGroupMember> removedMembers,
+ Set<String> removedRegexes
+ ) {
+ Map<String, Integer> subscribedTopicsNames =
super.computeSubscribedTopicNames(removedMembers);
+
+ removedRegexes.forEach(regex ->
+
resolvedRegularExpression(regex).ifPresent(resolvedRegularExpression ->
+ resolvedRegularExpression.topics.forEach(topic ->
+ subscribedTopicsNames.compute(topic, Utils::decValue)
+ )
+ )
+ );
+
+ return subscribedTopicsNames;
+ }
+
/**
* Update the resolved regular expression.
*
@@ -631,21 +694,25 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*/
@Override
public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
- members().forEach((memberId, member) ->
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
memberId))
+ members.keySet().forEach(memberId ->
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId))
);
- members().forEach((memberId, member) ->
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
memberId))
+ members.keySet().forEach(memberId ->
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId))
);
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId));
- members().forEach((memberId, member) ->
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
memberId))
+ members.keySet().forEach(memberId ->
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId))
);
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+ resolvedRegularExpressions.keySet().forEach(regex ->
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
regex))
+ );
+
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId));
}
/**
@@ -662,24 +729,28 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
String leavingMemberId,
String joiningMemberId
) {
- members().forEach((memberId, __) -> {
+ members.keySet().forEach(memberId -> {
String removedMemberId = memberId.equals(leavingMemberId) ?
joiningMemberId : memberId;
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
removedMemberId));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
removedMemberId));
});
- members().forEach((memberId, __) -> {
+ members.keySet().forEach(memberId -> {
String removedMemberId = memberId.equals(leavingMemberId) ?
joiningMemberId : memberId;
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
removedMemberId));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
removedMemberId));
});
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId));
- members().forEach((memberId, __) -> {
+ members.keySet().forEach(memberId -> {
String removedMemberId = memberId.equals(leavingMemberId) ?
joiningMemberId : memberId;
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
removedMemberId));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
removedMemberId));
});
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+ resolvedRegularExpressions.keySet().forEach(regex ->
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
regex))
+ );
+
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId));
}
@Override
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 12cd0c1ab9f..15d3cd03f05 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -100,6 +101,38 @@ public class Assertions {
}
}
+ public static void assertUnorderedRecordsEquals(
+ List<List<CoordinatorRecord>> expectedRecords,
+ List<CoordinatorRecord> actualRecords
+ ) {
+ try {
+ int i = 0, j = 0;
+ while (i < expectedRecords.size()) {
+ List<CoordinatorRecord> slice = expectedRecords.get(i);
+ assertRecordsEquals(
+ slice
+ .stream()
+ .sorted(Comparator.comparing(Object::toString))
+ .collect(Collectors.toList()),
+ actualRecords
+ .subList(j, j + slice.size())
+ .stream()
+ .sorted(Comparator.comparing(Object::toString))
+ .collect(Collectors.toList())
+ );
+
+ j += slice.size();
+ i++;
+ }
+ assertEquals(j, actualRecords.size());
+ } catch (AssertionFailedError e) {
+ assertionFailure()
+ .expected(expectedRecords)
+ .actual(actualRecords)
+ .buildAndThrow();
+ }
+ }
+
public static void assertRecordEquals(
CoordinatorRecord expected,
CoordinatorRecord actual
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 98fa8452c88..d9609e0f7a5 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -15683,6 +15683,262 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords.subList(2, 4),
task.result.records().subList(2, 4));
}
+ @Test
+ public void
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build(1L))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .build())
+ .withResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Set.of(fooTopicName), 0L, 0L))
+ .withResolvedRegularExpression("bar*", new
ResolvedRegularExpression(
+ Set.of(barTopicName), 0L, 0L))
+ .withSubscriptionMetadata(Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6),
+ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)))
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Setup the timers.
+ context.onLoaded();
+
+ // Member 1 leaves the group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(-1));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(-1),
+ result.response()
+ );
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
+ Map.of(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3))
+ ),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
+ // Member 2 is fenced due to reaching the session timeout.
+ context.assertSessionTimeout(groupId, memberId2, 45000);
+ List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+
+ // Verify the expired timeout.
+ assertEquals(
+ Collections.singletonList(new ExpiredTimeout<Void,
CoordinatorRecord>(
+ groupSessionTimeoutKey(groupId, memberId2),
+ new CoordinatorResult<>(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"bar*"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Collections.emptyMap()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
+ )
+ )
+ )),
+ timeouts
+ );
+ }
+
+ @Test
+ public void
testResolvedRegularExpressionsRemovedWhenConsumerMembersRemovedByAdminApi() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String memberId3 = Uuid.randomUuid().toString();
+ String memberId4 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build(1L))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setInstanceId(memberId1)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setInstanceId(memberId2)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId3)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setInstanceId(memberId3)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId4)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setInstanceId(memberId4)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(barTopicId, 2)))
+ .build())
+ .withResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Set.of(fooTopicName), 0L, 0L))
+ .withResolvedRegularExpression("bar*", new
ResolvedRegularExpression(
+ Set.of(barTopicName), 0L, 0L))
+ .withSubscriptionMetadata(Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6),
+ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
+ ))
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Remove members.
+ CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> result =
context.sendClassicGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId(groupId)
+ .setMembers(List.of(
+ new MemberIdentity().setGroupInstanceId(memberId1),
+ new MemberIdentity().setGroupInstanceId(memberId2),
+ new MemberIdentity().setGroupInstanceId(memberId3)
+ ))
+ );
+
+ assertEquals(
+ new LeaveGroupResponseData()
+ .setMembers(List.of(
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(memberId1)
+ .setGroupInstanceId(memberId1),
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(memberId2)
+ .setGroupInstanceId(memberId2),
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(memberId3)
+ .setGroupInstanceId(memberId3)
+ )),
+ result.response()
+ );
+
+ assertRecordsEquals(
+ List.of(
+ // Remove member 1.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+ // Remove member 2.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
+ // Remove member 3.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId3),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId3),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId3),
+ // Remove regex.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+ // Updated subscription metadata.
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
+ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
+ )),
+ // Bumped epoch.
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
+ ),
+ result.records()
+ );
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 13ced4b6dab..1caa54325e4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -1577,6 +1577,10 @@ public class GroupMetadataManagerTestContext {
snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
}
+ void onLoaded() {
+ groupMetadataManager.onLoaded();
+ }
+
void onUnloaded() {
groupMetadataManager.onUnloaded();
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 9c829630f09..eacb4fca4d5 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -66,9 +66,7 @@ import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
-import static
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
-import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
@@ -1457,52 +1455,6 @@ public class ConsumerGroupTest {
assertEquals(1, consumerGroup.numClassicProtocolMembers());
}
- @Test
- public void testCreateGroupTombstoneRecordsWithReplacedMember() {
- String groupId = "group";
- String memberId1 = "member-1";
- String memberId2 = "member-2";
- String newMemberId2 = "new-member-2";
-
- ConsumerGroup consumerGroup = createConsumerGroup(groupId);
- List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new
ArrayList<>();
- protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
- .setName("range")
- .setMetadata(new byte[0]));
-
- ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
- .setClassicMemberMetadata(new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
- .setSupportedProtocols(protocols))
- .build();
- consumerGroup.updateMember(member1);
-
- ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
- .setInstanceId("instance-id-2")
- .build();
- consumerGroup.updateMember(member2);
-
- List<CoordinatorRecord> records = new ArrayList<>();
- consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records,
memberId2, newMemberId2);
-
- List<CoordinatorRecord> expectedRecords = Arrays.asList(
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
- );
- assertEquals(expectedRecords.size(), records.size());
- assertUnorderedListEquals(expectedRecords.subList(0, 2),
records.subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 4),
records.subList(2, 4));
- assertRecordEquals(expectedRecords.get(4), records.get(4));
- assertUnorderedListEquals(expectedRecords.subList(5, 7),
records.subList(5, 7));
- assertRecordsEquals(expectedRecords.subList(7, 9), records.subList(7,
9));
- }
-
@Test
public void testFromClassicGroup() {
MockTime time = new MockTime();
@@ -1799,4 +1751,350 @@ public class ConsumerGroupTest {
consumerGroup.subscribedTopicNames()
);
}
+
+ @Test
+ public void testComputeSubscribedTopicNamesWithoutDeletedMembers() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .build();
+ consumerGroup.updateMember(member2);
+
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder("member3")
+ .setSubscribedTopicRegex("foo*")
+ .build();
+ consumerGroup.updateMember(member3);
+
+ ConsumerGroupMember member4 = new
ConsumerGroupMember.Builder("member4")
+ .setSubscribedTopicRegex("foo*")
+ .build();
+ consumerGroup.updateMember(member4);
+
+ ConsumerGroupMember member5 = new
ConsumerGroupMember.Builder("member5")
+ .setSubscribedTopicRegex("bar*")
+ .build();
+ consumerGroup.updateMember(member5);
+
+ ConsumerGroupMember member6 = new
ConsumerGroupMember.Builder("member6")
+ .setSubscribedTopicRegex("bar*")
+ .build();
+ consumerGroup.updateMember(member6);
+
+ consumerGroup.updateResolvedRegularExpression(
+ "foo*",
+ new ResolvedRegularExpression(
+ Set.of("foo", "fooo"),
+ 10L,
+ 12345L
+ )
+ );
+
+ consumerGroup.updateResolvedRegularExpression(
+ "bar*",
+ new ResolvedRegularExpression(
+ Set.of("bar", "barr"),
+ 10L,
+ 12345L
+ )
+ );
+
+ // Verify initial state.
+ assertEquals(
+ Map.of(
+ "foo", 3,
+ "fooo", 1,
+ "bar", 3,
+ "barr", 1,
+ "zar", 1
+ ),
+ consumerGroup.subscribedTopicNames()
+ );
+
+ // Compute with removed members and regexes.
+ assertEquals(
+ Map.of(
+ "foo", 1,
+ "bar", 2,
+ "barr", 1,
+ "zar", 1
+ ),
+ consumerGroup.computeSubscribedTopicNamesWithoutDeletedMembers(
+ Set.of(member2, member3, member4, member5),
+ Set.of("foo*")
+ )
+ );
+ }
+
+ @Test
+ public void testComputeSubscribedTopicNames() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(List.of("foo", "bar", "zar"))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
+ .setSubscribedTopicNames(List.of("foo", "bar"))
+ .build();
+ consumerGroup.updateMember(member2);
+
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder("member3")
+ .setSubscribedTopicNames(List.of("foo"))
+ .setSubscribedTopicRegex("foo*")
+ .build();
+ consumerGroup.updateMember(member3);
+
+ consumerGroup.updateResolvedRegularExpression(
+ "foo*",
+ new ResolvedRegularExpression(
+ Set.of("foo", "fooo"),
+ 10L,
+ 12345L
+ )
+ );
+
+ // Verify initial state.
+ assertEquals(
+ Map.of(
+ "foo", 4,
+ "fooo", 1,
+ "bar", 2,
+ "zar", 1
+ ),
+ consumerGroup.subscribedTopicNames()
+ );
+
+ // Compute subscribed topic names without changing the regex.
+ assertEquals(
+ Map.of(
+ "foo", 4,
+ "fooo", 1,
+ "bar", 2,
+ "zar", 1
+ ),
+ consumerGroup.computeSubscribedTopicNames(member3, member3)
+ );
+
+ // Compute subscribed topic names with removing the regex.
+ assertEquals(
+ Map.of(
+ "foo", 3,
+ "bar", 2,
+ "zar", 1
+ ),
+ consumerGroup.computeSubscribedTopicNames(
+ member3,
+ new ConsumerGroupMember.Builder(member3)
+ .setSubscribedTopicRegex("")
+ .build()
+ )
+ );
+
+ // Compute subscribed topic names with removing the names.
+ assertEquals(
+ Map.of(
+ "foo", 3,
+ "fooo", 1,
+ "bar", 2,
+ "zar", 1
+ ),
+ consumerGroup.computeSubscribedTopicNames(
+ member3,
+ new ConsumerGroupMember.Builder(member3)
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()
+ )
+ );
+
+ // Compute subscribed topic names with removing both.
+ assertEquals(
+ Map.of(
+ "foo", 2,
+ "bar", 2,
+ "zar", 1
+ ),
+ consumerGroup.computeSubscribedTopicNames(
+ member3,
+ new ConsumerGroupMember.Builder(member3)
+ .setSubscribedTopicNames(Collections.emptyList())
+ .setSubscribedTopicRegex("")
+ .build()
+ )
+ );
+ }
+
+ @Test
+ public void testCreateGroupTombstoneRecords() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ consumerGroup.setGroupEpoch(10);
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1,
2)))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1,
2)))
+ .build();
+ consumerGroup.updateMember(member2);
+
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder("member3")
+ .setMemberEpoch(10)
+ .setSubscribedTopicRegex("foo*")
+
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1,
2)))
+ .build();
+ consumerGroup.updateMember(member3);
+
+ consumerGroup.updateResolvedRegularExpression(
+ "foo*",
+ new ResolvedRegularExpression(
+ Set.of("foo", "fooo"),
+ 10L,
+ 12345L
+ )
+ );
+
+ consumerGroup.updateTargetAssignment("member1", new
Assignment(mkAssignment(
+ mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+ ));
+
+ consumerGroup.updateTargetAssignment("member2", new
Assignment(mkAssignment(
+ mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+ ));
+
+ consumerGroup.updateTargetAssignment("member3", new
Assignment(mkAssignment(
+ mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+ ));
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ consumerGroup.createGroupTombstoneRecords(records);
+
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
"member1"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
"member2"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
"member3")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"member1"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"member2"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"member3")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"member1"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"member2"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"member3")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone("foo",
"foo*")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo")
+ )
+ ),
+ records
+ );
+ }
+
+ @Test
+ public void testCreateGroupTombstoneRecordsWithReplacedMember() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ consumerGroup.setGroupEpoch(10);
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1,
2)))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1,
2)))
+ .build();
+ consumerGroup.updateMember(member2);
+
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder("member3")
+ .setMemberEpoch(10)
+ .setSubscribedTopicRegex("foo*")
+
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1,
2)))
+ .build();
+ consumerGroup.updateMember(member3);
+
+ consumerGroup.updateResolvedRegularExpression(
+ "foo*",
+ new ResolvedRegularExpression(
+ Set.of("foo", "fooo"),
+ 10L,
+ 12345L
+ )
+ );
+
+ consumerGroup.updateTargetAssignment("member1", new
Assignment(mkAssignment(
+ mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+ ));
+
+ consumerGroup.updateTargetAssignment("member2", new
Assignment(mkAssignment(
+ mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+ ));
+
+ consumerGroup.updateTargetAssignment("member3", new
Assignment(mkAssignment(
+ mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+ ));
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records,
"member3", "member4");
+
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
"member1"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
"member2"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
"member4")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"member1"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"member2"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"member4")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"member1"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"member2"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"member4")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone("foo",
"foo*")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo")
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo")
+ )
+ ),
+ records
+ );
+ }
}