This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af012e1ec24 KAFKA-18961: Time-based refresh for server-side RE2J regex 
(#19904)
af012e1ec24 is described below

commit af012e1ec247cb9aceca2cc658f13618c4e30e47
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Thu Jun 12 07:54:39 2025 -0400

    KAFKA-18961: Time-based refresh for server-side RE2J regex (#19904)
    
    Consumers can subscribe to an RE2J SubscriptionPattern that will be
    resolved and maintained on the server-side (KIP-848). Currently, those
    regexes are refreshed on the coordinator when a consumer subscribes to a
    new regex, or if there is a new topic metadata image (to ensure regex
    resolution stays up-to-date with existing topics)
    
    But with
    [KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813), the
    topics matching a regex are filtered based on ACLs. This generates a new
    situation, as regexes resolution do not stay up-to-date as topics become
    visible (ACLs added/delete).
    
    This patch introduces time-based refresh for the subscribed regex by
    - Adding internal `group.consumer.regex.batch.refresh.max.interval.ms`
    config
    that controls the refresh interval.
    - Schedule a regex refresh when updating regex subscription if the
    latest refresh is older than the max interval.
    
    Reviewers: David Jacot <[email protected]>
---
 .../api/AbstractAuthorizerIntegrationTest.scala    |   1 +
 .../kafka/api/AuthorizerIntegrationTest.scala      |  26 ++-
 .../coordinator/group/GroupCoordinatorConfig.java  |  16 ++
 .../coordinator/group/GroupMetadataManager.java    |  25 ++-
 .../group/GroupCoordinatorConfigTest.java          |   2 +
 .../group/GroupMetadataManagerTest.java            | 231 ++++++++++++++++++++-
 6 files changed, 289 insertions(+), 12 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index 54f6d71a278..0281d43a947 100644
--- 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -111,6 +111,7 @@ class AbstractAuthorizerIntegrationTest extends 
BaseRequestTest {
 
     properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
     
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
"1")
+    
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 "10000")
     properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
     
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
     properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2c7189b5a70..85f89c1b647 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3076,6 +3076,29 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendAndReceiveRegexHeartbeat(response, listenerName, None)
   }
 
+  @Test
+  def 
testConsumerGroupHeartbeatWithRegexWithTopicDescribeAclAddedAndRemoved(): Unit 
= {
+    createTopicWithBrokerPrincipal(topic)
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
+
+    val memberId = Uuid.randomUuid.toString;
+    var response = sendAndReceiveFirstRegexHeartbeat(memberId, listenerName)
+    TestUtils.tryUntilNoAssertionError() {
+      response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(0), 
true)
+    }
+
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+    TestUtils.tryUntilNoAssertionError(waitTime = 25000) {
+      response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
+    }
+
+    removeAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+    TestUtils.tryUntilNoAssertionError(waitTime = 25000) {
+      response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(0))
+    }
+  }
+
   @Test
   def testConsumerGroupHeartbeatWithRegexWithDifferentMemberAcls(): Unit = {
     createTopicWithBrokerPrincipal(topic, numPartitions = 2)
@@ -3093,7 +3116,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     // member permissions while computing assignments.
     var member2Response = 
sendAndReceiveFirstRegexHeartbeat("memberWithLimitedAccess", listenerName)
     member1Response = sendAndReceiveRegexHeartbeat(member1Response, 
interBrokerListenerName, Some(1))
-    member1Response = sendAndReceiveRegexHeartbeat(member1Response, 
interBrokerListenerName, None, fullRequest = true)
+    member1Response = sendAndReceiveRegexHeartbeat(member1Response, 
interBrokerListenerName, Some(1), fullRequest = true)
     member2Response = sendAndReceiveRegexHeartbeat(member2Response, 
listenerName, Some(1))
 
     // Create another topic and send heartbeats on member1 to trigger regex 
refresh
@@ -3624,6 +3647,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       data = data
         .setTopicPartitions(partitions.asJava)
         .setSubscribedTopicRegex("^top.*")
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
     }
     val request = new ConsumerGroupHeartbeatRequest.Builder(data).build()
     val resource = Set[ResourceType](GROUP, TOPIC)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 5c72eba5071..c4650145ebc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -207,6 +207,11 @@ public class GroupCoordinatorConfig {
         ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from 
consumer group to classic group is enabled, " +
         ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor 
downgrade is enabled.";
 
+    public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG 
= "group.consumer.regex.refresh.interval.ms";
+    public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC = 
"The interval at which the group coordinator will refresh " +
+        "the topics matching the group subscribed regexes. This is only 
applicable to consumer groups using the consumer group protocol. ";
+    public static final int CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT = 
10 * 60 * 1000; // 10 minutes
+
     ///
     /// Share group configs
     ///
@@ -318,6 +323,8 @@ public class GroupCoordinatorConfig {
         .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, 
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, 
CONSUMER_GROUP_MAX_SIZE_DOC)
         .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, 
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
         .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, 
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
 MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+        // Interval config used for testing purposes.
+        .defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM, 
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
 
         // Share group configs
         .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, 
SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
@@ -370,6 +377,7 @@ public class GroupCoordinatorConfig {
     private final int consumerGroupMaxSessionTimeoutMs;
     private final int consumerGroupMinHeartbeatIntervalMs;
     private final int consumerGroupMaxHeartbeatIntervalMs;
+    private final int consumerGroupRegexRefreshIntervalMs;
     // Share group configurations
     private final int shareGroupMaxSize;
     private final int shareGroupSessionTimeoutMs;
@@ -419,6 +427,7 @@ public class GroupCoordinatorConfig {
         this.consumerGroupMaxSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
         this.consumerGroupMinHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.consumerGroupMaxHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.consumerGroupRegexRefreshIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG);
         // Share group configurations
         this.shareGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
         this.shareGroupMinSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@@ -810,6 +819,13 @@ public class GroupCoordinatorConfig {
         return consumerGroupMaxHeartbeatIntervalMs;
     }
 
+    /**
+     * The consumer group regex batch refresh max interval in milliseconds.
+     */
+    public int consumerGroupRegexRefreshIntervalMs() {
+        return consumerGroupRegexRefreshIntervalMs;
+    }
+
     /**
      * The share group session timeout in milliseconds.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 3d293a1ddca..20b9337fb41 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -402,8 +402,10 @@ public class GroupMetadataManager {
     /**
      * The minimum amount of time between two consecutive refreshes of
      * the regular expressions within a single group.
+     *
+     * Package private for setting the lower limit of the refresh interval.
      */
-    private static final long REGEX_BATCH_REFRESH_INTERVAL_MS = 10_000L;
+    static final long REGEX_BATCH_REFRESH_MIN_INTERVAL_MS = 10_000L;
 
     /**
      * The log context.
@@ -3076,6 +3078,7 @@ public class GroupMetadataManager {
         ConsumerGroupMember updatedMember,
         List<CoordinatorRecord> records
     ) {
+        final long currentTimeMs = time.milliseconds();
         String groupId = group.groupId();
         String memberId = updatedMember.memberId();
         String oldSubscribedTopicRegex = member.subscribedTopicRegex();
@@ -3113,11 +3116,12 @@ public class GroupMetadataManager {
         }
 
         // Conditions to trigger a refresh:
-        // 0. The group is subscribed to regular expressions.
-        // 1. There is no ongoing refresh for the group.
-        // 2. The last refresh is older than 10s.
-        // 3. The group has unresolved regular expressions.
-        // 4. The metadata image has new topics.
+        // 0.   The group is subscribed to regular expressions.
+        // 1.   There is no ongoing refresh for the group.
+        // 2.   The last refresh is older than 10s.
+        // 3.1  The group has unresolved regular expressions.
+        // 3.2  Or the metadata image has new topics.
+        // 3.3  Or the last refresh is older than the batch refresh max 
interval.
 
         // 0. The group is subscribed to regular expressions. We also take the 
one
         //    that the current may have just introduced.
@@ -3134,11 +3138,11 @@ public class GroupMetadataManager {
         // 2. The last refresh is older than 10s. If the group does not have 
any regular
         //    expressions but the current member just brought a new one, we 
should continue.
         long lastRefreshTimeMs = 
group.lastResolvedRegularExpressionRefreshTimeMs();
-        if (time.milliseconds() <= lastRefreshTimeMs + 
REGEX_BATCH_REFRESH_INTERVAL_MS) {
+        if (currentTimeMs <= lastRefreshTimeMs + 
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
             return bumpGroupEpoch;
         }
 
-        // 3. The group has unresolved regular expressions.
+        // 3.1 The group has unresolved regular expressions.
         Map<String, Integer> subscribedRegularExpressions = new 
HashMap<>(group.subscribedRegularExpressions());
         if (isNotEmpty(oldSubscribedTopicRegex)) {
             subscribedRegularExpressions.compute(oldSubscribedTopicRegex, 
Utils::decValue);
@@ -3149,9 +3153,12 @@ public class GroupMetadataManager {
 
         requireRefresh |= subscribedRegularExpressions.size() != 
group.numResolvedRegularExpressions();
 
-        // 4. The metadata has new topics that we must consider.
+        // 3.2 The metadata has new topics that we must consider.
         requireRefresh |= group.lastResolvedRegularExpressionVersion() < 
lastMetadataImageWithNewTopics;
 
+        // 3.3 The last refresh is older than the batch refresh max interval.
+        requireRefresh |= currentTimeMs > lastRefreshTimeMs + 
config.consumerGroupRegexRefreshIntervalMs();
+
         if (requireRefresh && !subscribedRegularExpressions.isEmpty()) {
             Set<String> regexes = 
Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
             executor.schedule(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 7c5c7a5b8da..84bd38ae60d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -198,6 +198,7 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 666);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 111);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 222);
+        
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 15 * 60 * 1000);
 
         GroupCoordinatorConfig config = createConfig(configs);
 
@@ -226,6 +227,7 @@ public class GroupCoordinatorConfigTest {
         assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
         assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
         assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
+        assertEquals(15 * 60 * 1000, 
config.consumerGroupRegexRefreshIntervalMs());
     }
 
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index dcc2a4ca5f3..3f47b8e70d8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20804,7 +20804,7 @@ public class GroupMetadataManagerTest {
                 .withMetadataHash(computeGroupHash(Map.of(fooTopicName, 
fooTopicHash))))
             .build();
 
-        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        // sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
         context.time.sleep(10001L);
 
         Map<String, AuthorizationResult> acls = new HashMap<>();
@@ -20887,7 +20887,7 @@ public class GroupMetadataManagerTest {
             context.processTasks()
         );
 
-        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        // sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
         context.time.sleep(10001L);
 
         // Access to the bar topic is granted.
@@ -20972,6 +20972,233 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build(12345L);
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+        long barTopicHash = computeTopicHash(barTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        Authorizer authorizer = mock(Authorizer.class);
+        Plugin<Authorizer> authorizerPlugin = Plugin.wrapInstance(authorizer, 
null, "authorizer.class.name");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 60000)
+            .withMetadataImage(metadataImage)
+            .withAuthorizerPlugin(authorizerPlugin)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(fooTopicName, 
fooTopicHash))))
+            .build();
+
+        // sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
+        context.time.sleep(10001L);
+
+        Map<String, AuthorizationResult> acls = new HashMap<>();
+        acls.put(fooTopicName, AuthorizationResult.ALLOWED);
+        acls.put(barTopicName, AuthorizationResult.DENIED);
+        when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+            List<Action> actions = invocation.getArgument(1);
+            return actions.stream()
+                .map(action -> 
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+                .collect(Collectors.toList());
+        });
+
+        // Member 2 heartbeats with a different regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("foo*|bar*")
+                .setServerAssignor("range")
+                .setTopicPartitions(List.of()),
+            ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+        );
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(3, 4, 5))))),
+            result1.response()
+        );
+
+        ConsumerGroupMember expectedMember2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("foo*|bar*")
+            .setServerAssignorName("range")
+            .build();
+
+        assertRecordsEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")
+            ),
+            result1.records()
+        );
+
+        // Execute pending tasks.
+        assertEquals(
+            List.of(
+                new MockCoordinatorExecutor.ExecutorResult<>(
+                    groupId + "-regex",
+                    new CoordinatorResult<>(List.of(
+                        // The resolution of the new regex is persisted.
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+                            groupId,
+                            "foo*|bar*",
+                            new ResolvedRegularExpression(
+                                Set.of("foo"),
+                                12345L,
+                                context.time.milliseconds()
+                            )
+                        ),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                            fooTopicName, fooTopicHash
+                        )))
+                    ))
+                )
+            ),
+            context.processTasks()
+        );
+
+        // sleep for more than REGEX_REFRESH_INTERVAL_MS
+        context.time.sleep(60001L);
+
+        // Access to the bar topic is granted.
+        acls.put(barTopicName, AuthorizationResult.ALLOWED);
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)
+            )),
+            memberId2, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)
+            ))
+        )));
+
+        // Member 2 heartbeats again with the same regex.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("foo*|bar*")
+                .setServerAssignor("range")
+                .setTopicPartitions(List.of()),
+            ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+        );
+
+        expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("foo*|bar*")
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)))
+            .build();
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(3, 4, 5))))),
+            result2.response()
+        );
+
+        assertRecordsEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
+            ),
+            result2.records()
+        );
+
+        // A regex refresh is triggered and the bar topic is included.
+        assertRecordsEquals(
+            List.of(
+                // The resolution of the new regex is persisted.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+                    groupId,
+                    "foo*|bar*",
+                    new ResolvedRegularExpression(
+                        Set.of("foo", "bar"),
+                        12345L,
+                        context.time.milliseconds()
+                    )
+                ),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 
computeGroupHash(Map.of(
+                    fooTopicName, fooTopicHash,
+                    barTopicName, barTopicHash
+                )))
+            ),
+            context.processTasks().get(0).result.records()
+        );
+    }
+
     @Test
     public void 
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
         String groupId = "fooup";

Reply via email to