This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new af012e1ec24 KAFKA-18961: Time-based refresh for server-side RE2J regex
(#19904)
af012e1ec24 is described below
commit af012e1ec247cb9aceca2cc658f13618c4e30e47
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Thu Jun 12 07:54:39 2025 -0400
KAFKA-18961: Time-based refresh for server-side RE2J regex (#19904)
Consumers can subscribe to an RE2J SubscriptionPattern that will be
resolved and maintained on the server-side (KIP-848). Currently, those
regexes are refreshed on the coordinator when a consumer subscribes to a
new regex, or if there is a new topic metadata image (to ensure regex
resolution stays up-to-date with existing topics)
But with
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813), the
topics matching a regex are filtered based on ACLs. This generates a new
situation, as regexes resolution do not stay up-to-date as topics become
visible (ACLs added/delete).
This patch introduces time-based refresh for the subscribed regex by
- Adding internal `group.consumer.regex.batch.refresh.max.interval.ms`
config
that controls the refresh interval.
- Schedule a regex refresh when updating regex subscription if the
latest refresh is older than the max interval.
Reviewers: David Jacot <[email protected]>
---
.../api/AbstractAuthorizerIntegrationTest.scala | 1 +
.../kafka/api/AuthorizerIntegrationTest.scala | 26 ++-
.../coordinator/group/GroupCoordinatorConfig.java | 16 ++
.../coordinator/group/GroupMetadataManager.java | 25 ++-
.../group/GroupCoordinatorConfigTest.java | 2 +
.../group/GroupMetadataManagerTest.java | 231 ++++++++++++++++++++-
6 files changed, 289 insertions(+), 12 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index 54f6d71a278..0281d43a947 100644
---
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -111,6 +111,7 @@ class AbstractAuthorizerIntegrationTest extends
BaseRequestTest {
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
+
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
"10000")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2c7189b5a70..85f89c1b647 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3076,6 +3076,29 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
sendAndReceiveRegexHeartbeat(response, listenerName, None)
}
+ @Test
+ def
testConsumerGroupHeartbeatWithRegexWithTopicDescribeAclAddedAndRemoved(): Unit
= {
+ createTopicWithBrokerPrincipal(topic)
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
+
+ val memberId = Uuid.randomUuid.toString;
+ var response = sendAndReceiveFirstRegexHeartbeat(memberId, listenerName)
+ TestUtils.tryUntilNoAssertionError() {
+ response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(0),
true)
+ }
+
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+ TestUtils.tryUntilNoAssertionError(waitTime = 25000) {
+ response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
+ }
+
+ removeAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+ TestUtils.tryUntilNoAssertionError(waitTime = 25000) {
+ response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(0))
+ }
+ }
+
@Test
def testConsumerGroupHeartbeatWithRegexWithDifferentMemberAcls(): Unit = {
createTopicWithBrokerPrincipal(topic, numPartitions = 2)
@@ -3093,7 +3116,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
// member permissions while computing assignments.
var member2Response =
sendAndReceiveFirstRegexHeartbeat("memberWithLimitedAccess", listenerName)
member1Response = sendAndReceiveRegexHeartbeat(member1Response,
interBrokerListenerName, Some(1))
- member1Response = sendAndReceiveRegexHeartbeat(member1Response,
interBrokerListenerName, None, fullRequest = true)
+ member1Response = sendAndReceiveRegexHeartbeat(member1Response,
interBrokerListenerName, Some(1), fullRequest = true)
member2Response = sendAndReceiveRegexHeartbeat(member2Response,
listenerName, Some(1))
// Create another topic and send heartbeats on member1 to trigger regex
refresh
@@ -3624,6 +3647,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
data = data
.setTopicPartitions(partitions.asJava)
.setSubscribedTopicRegex("^top.*")
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
}
val request = new ConsumerGroupHeartbeatRequest.Builder(data).build()
val resource = Set[ResourceType](GROUP, TOPIC)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 5c72eba5071..c4650145ebc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -207,6 +207,11 @@ public class GroupCoordinatorConfig {
ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from
consumer group to classic group is enabled, " +
ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor
downgrade is enabled.";
+ public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG
= "group.consumer.regex.refresh.interval.ms";
+ public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC =
"The interval at which the group coordinator will refresh " +
+ "the topics matching the group subscribed regexes. This is only
applicable to consumer groups using the consumer group protocol. ";
+ public static final int CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT =
10 * 60 * 1000; // 10 minutes
+
///
/// Share group configs
///
@@ -318,6 +323,8 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT,
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST,
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+ // Interval config used for testing purposes.
+ .defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM,
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
// Share group configs
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
@@ -370,6 +377,7 @@ public class GroupCoordinatorConfig {
private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs;
private final int consumerGroupMaxHeartbeatIntervalMs;
+ private final int consumerGroupRegexRefreshIntervalMs;
// Share group configurations
private final int shareGroupMaxSize;
private final int shareGroupSessionTimeoutMs;
@@ -419,6 +427,7 @@ public class GroupCoordinatorConfig {
this.consumerGroupMaxSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxHeartbeatIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.consumerGroupRegexRefreshIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG);
// Share group configurations
this.shareGroupSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.shareGroupMinSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@@ -810,6 +819,13 @@ public class GroupCoordinatorConfig {
return consumerGroupMaxHeartbeatIntervalMs;
}
+ /**
+ * The consumer group regex batch refresh max interval in milliseconds.
+ */
+ public int consumerGroupRegexRefreshIntervalMs() {
+ return consumerGroupRegexRefreshIntervalMs;
+ }
+
/**
* The share group session timeout in milliseconds.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 3d293a1ddca..20b9337fb41 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -402,8 +402,10 @@ public class GroupMetadataManager {
/**
* The minimum amount of time between two consecutive refreshes of
* the regular expressions within a single group.
+ *
+ * Package private for setting the lower limit of the refresh interval.
*/
- private static final long REGEX_BATCH_REFRESH_INTERVAL_MS = 10_000L;
+ static final long REGEX_BATCH_REFRESH_MIN_INTERVAL_MS = 10_000L;
/**
* The log context.
@@ -3076,6 +3078,7 @@ public class GroupMetadataManager {
ConsumerGroupMember updatedMember,
List<CoordinatorRecord> records
) {
+ final long currentTimeMs = time.milliseconds();
String groupId = group.groupId();
String memberId = updatedMember.memberId();
String oldSubscribedTopicRegex = member.subscribedTopicRegex();
@@ -3113,11 +3116,12 @@ public class GroupMetadataManager {
}
// Conditions to trigger a refresh:
- // 0. The group is subscribed to regular expressions.
- // 1. There is no ongoing refresh for the group.
- // 2. The last refresh is older than 10s.
- // 3. The group has unresolved regular expressions.
- // 4. The metadata image has new topics.
+ // 0. The group is subscribed to regular expressions.
+ // 1. There is no ongoing refresh for the group.
+ // 2. The last refresh is older than 10s.
+ // 3.1 The group has unresolved regular expressions.
+ // 3.2 Or the metadata image has new topics.
+ // 3.3 Or the last refresh is older than the batch refresh max
interval.
// 0. The group is subscribed to regular expressions. We also take the
one
// that the current may have just introduced.
@@ -3134,11 +3138,11 @@ public class GroupMetadataManager {
// 2. The last refresh is older than 10s. If the group does not have
any regular
// expressions but the current member just brought a new one, we
should continue.
long lastRefreshTimeMs =
group.lastResolvedRegularExpressionRefreshTimeMs();
- if (time.milliseconds() <= lastRefreshTimeMs +
REGEX_BATCH_REFRESH_INTERVAL_MS) {
+ if (currentTimeMs <= lastRefreshTimeMs +
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
return bumpGroupEpoch;
}
- // 3. The group has unresolved regular expressions.
+ // 3.1 The group has unresolved regular expressions.
Map<String, Integer> subscribedRegularExpressions = new
HashMap<>(group.subscribedRegularExpressions());
if (isNotEmpty(oldSubscribedTopicRegex)) {
subscribedRegularExpressions.compute(oldSubscribedTopicRegex,
Utils::decValue);
@@ -3149,9 +3153,12 @@ public class GroupMetadataManager {
requireRefresh |= subscribedRegularExpressions.size() !=
group.numResolvedRegularExpressions();
- // 4. The metadata has new topics that we must consider.
+ // 3.2 The metadata has new topics that we must consider.
requireRefresh |= group.lastResolvedRegularExpressionVersion() <
lastMetadataImageWithNewTopics;
+ // 3.3 The last refresh is older than the batch refresh max interval.
+ requireRefresh |= currentTimeMs > lastRefreshTimeMs +
config.consumerGroupRegexRefreshIntervalMs();
+
if (requireRefresh && !subscribedRegularExpressions.isEmpty()) {
Set<String> regexes =
Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 7c5c7a5b8da..84bd38ae60d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -198,6 +198,7 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
666);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
111);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
222);
+
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
15 * 60 * 1000);
GroupCoordinatorConfig config = createConfig(configs);
@@ -226,6 +227,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
+ assertEquals(15 * 60 * 1000,
config.consumerGroupRegexRefreshIntervalMs());
}
@Test
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index dcc2a4ca5f3..3f47b8e70d8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20804,7 +20804,7 @@ public class GroupMetadataManagerTest {
.withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash))))
.build();
- // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+ // sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
context.time.sleep(10001L);
Map<String, AuthorizationResult> acls = new HashMap<>();
@@ -20887,7 +20887,7 @@ public class GroupMetadataManagerTest {
context.processTasks()
);
- // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+ // sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
context.time.sleep(10001L);
// Access to the bar topic is granted.
@@ -20972,6 +20972,233 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build(12345L);
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ Authorizer authorizer = mock(Authorizer.class);
+ Plugin<Authorizer> authorizerPlugin = Plugin.wrapInstance(authorizer,
null, "authorizer.class.name");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
60000)
+ .withMetadataImage(metadataImage)
+ .withAuthorizerPlugin(authorizerPlugin)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Set.of(fooTopicName), 0L, 0L))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash))))
+ .build();
+
+ // sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
+ context.time.sleep(10001L);
+
+ Map<String, AuthorizationResult> acls = new HashMap<>();
+ acls.put(fooTopicName, AuthorizationResult.ALLOWED);
+ acls.put(barTopicName, AuthorizationResult.DENIED);
+ when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+ List<Action> actions = invocation.getArgument(1);
+ return actions.stream()
+ .map(action ->
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+ .collect(Collectors.toList());
+ });
+
+ // Member 2 heartbeats with a different regular expression.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()),
+ ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+ );
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(3, 4, 5))))),
+ result1.response()
+ );
+
+ ConsumerGroupMember expectedMember2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")
+ ),
+ result1.records()
+ );
+
+ // Execute pending tasks.
+ assertEquals(
+ List.of(
+ new MockCoordinatorExecutor.ExecutorResult<>(
+ groupId + "-regex",
+ new CoordinatorResult<>(List.of(
+ // The resolution of the new regex is persisted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "foo*|bar*",
+ new ResolvedRegularExpression(
+ Set.of("foo"),
+ 12345L,
+ context.time.milliseconds()
+ )
+ ),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ )))
+ ))
+ )
+ ),
+ context.processTasks()
+ );
+
+ // sleep for more than REGEX_REFRESH_INTERVAL_MS
+ context.time.sleep(60001L);
+
+ // Access to the bar topic is granted.
+ acls.put(barTopicName, AuthorizationResult.ALLOWED);
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)
+ ))
+ )));
+
+ // Member 2 heartbeats again with the same regex.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()),
+ ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+ );
+
+ expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build();
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(3, 4, 5))))),
+ result2.response()
+ );
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ ),
+ result2.records()
+ );
+
+ // A regex refresh is triggered and the bar topic is included.
+ assertRecordsEquals(
+ List.of(
+ // The resolution of the new regex is persisted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "foo*|bar*",
+ new ResolvedRegularExpression(
+ Set.of("foo", "bar"),
+ 12345L,
+ context.time.milliseconds()
+ )
+ ),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12,
computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash,
+ barTopicName, barTopicHash
+ )))
+ ),
+ context.processTasks().get(0).result.records()
+ );
+ }
+
@Test
public void
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
String groupId = "fooup";