This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8864cba0e85 MINOR: Update full request condition in 
ConsumerGroupHeartbeat request handling (#18061)
8864cba0e85 is described below

commit 8864cba0e85ed076d21d1632847a40269f6aacf3
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 6 08:05:22 2024 +0100

    MINOR: Update full request condition in ConsumerGroupHeartbeat request 
handling (#18061)
    
    With the addition of the SubscribedTopicRegex field to the 
ConsumerGroupHeartbeat request, we need to update the definition of a full 
request. This patch does so.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  8 ++--
 .../group/GroupMetadataManagerTest.java            | 46 +++++++++++++++++-----
 2 files changed, 40 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 95fff558d5e..69e7955d0bf 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1768,11 +1768,11 @@ public class GroupMetadataManager {
         // The assignment is only provided in the following cases:
         // 1. The member sent a full request. It does so when joining or 
rejoining the group with zero
         //    as the member epoch; or on any errors (e.g. timeout). We use all 
the non-optional fields
-        //    (rebalanceTimeoutMs, subscribedTopicNames and 
ownedTopicPartitions) to detect a full request
-        //    as those must be set in a full request.
+        //    (rebalanceTimeoutMs, (subscribedTopicNames or 
subscribedTopicRegex) and ownedTopicPartitions)
+        //    to detect a full request as those must be set in a full request.
         // 2. The member's assignment has been updated.
-        boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);
-        if (isFullRequest || hasAssignedPartitionsChanged(member, 
updatedMember)) {
+        boolean isFullRequest = rebalanceTimeoutMs != -1 && 
(subscribedTopicNames != null || subscribedTopicRegex != null) && 
ownedTopicPartitions != null;
+        if (memberEpoch == 0 || isFullRequest || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             
response.setAssignment(createConsumerGroupResponseAssignment(updatedMember));
         }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index d9609e0f7a5..f6391b5bd61 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1967,7 +1967,7 @@ public class GroupMetadataManagerTest {
         );
 
         // A full response should be sent back when the member sends
-        // a full request again.
+        // a full request again with topic names set.
         result = context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
@@ -1990,6 +1990,31 @@ public class GroupMetadataManagerTest {
                             .setPartitions(List.of(0, 1))))),
             result.response()
         );
+
+        // A full response should be sent back when the member sends
+        // a full request again with regex set.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().memberEpoch())
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("foo.*")
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1))))),
+            result.response()
+        );
     }
 
     @Test
@@ -15310,7 +15335,14 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
                 .setMemberEpoch(10)
-                .setHeartbeatIntervalMs(5000),
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                    ))
+                ),
             result.response()
         );
 
@@ -15442,10 +15474,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId(groupId)
                 .setMemberId(memberId1)
                 .setMemberEpoch(1)
-                .setRebalanceTimeoutMs(5000)
-                .setSubscribedTopicRegex("foo*|bar*")
-                .setServerAssignor("range")
-                .setTopicPartitions(Collections.emptyList()));
+                .setSubscribedTopicRegex("foo*|bar*"));
 
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
@@ -15494,10 +15523,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId(groupId)
                 .setMemberId(memberId1)
                 .setMemberEpoch(1)
-                .setRebalanceTimeoutMs(5000)
-                .setSubscribedTopicRegex("foo*|bar*")
-                .setServerAssignor("range")
-                .setTopicPartitions(Collections.emptyList()));
+                .setSubscribedTopicRegex("foo*|bar*"));
 
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()

Reply via email to