This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 275b995bf2c KAFKA-18095; Allow a  member to join without subscription 
under new consumer protocol (#18003)
275b995bf2c is described below

commit 275b995bf2c8a26977ea72fa2b4831b930276654
Author: David Jacot <[email protected]>
AuthorDate: Tue Dec 3 11:11:36 2024 +0100

    KAFKA-18095; Allow a  member to join without subscription under new 
consumer protocol (#18003)
    
    This patch relaxes requiring non-empty subscribed names and regex in the 
full heartbeat request. Without this, a consumer using client side regexes may 
not be able to join the group when the regex does not match any topics yet and 
this is inconsistent with the old protocol. Relaxing the subscribed regex is 
not strictly required but it seems better to keep it consistent.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 55 ++++++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.java    |  9 ++--
 .../group/GroupMetadataManagerTest.java            |  4 +-
 3 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 59d4e05a01f..e597d1cf7c2 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -276,6 +276,61 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     }
   }
 
+  @ClusterTest
+  def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
+    val admin = cluster.admin()
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Heartbeat request to join the group.
+      var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+        new ConsumerGroupHeartbeatRequestData()
+          .setGroupId("grp")
+          .setMemberId(Uuid.randomUuid().toString)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(5 * 60 * 1000)
+          .setSubscribedTopicRegex("")
+          .setTopicPartitions(List.empty.asJava)
+      ).build()
+
+      // Send the request until receiving a successful response. There is a 
delay
+      // here because the group coordinator is loaded in the background.
+      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+      }, msg = s"Did not receive the expected successful response. Last 
response $consumerGroupHeartbeatResponse.")
+
+      // Heartbeat request to join the group.
+      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+        new ConsumerGroupHeartbeatRequestData()
+          .setGroupId("grp")
+          .setMemberId(Uuid.randomUuid().toString)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(5 * 60 * 1000)
+          .setSubscribedTopicNames(List.empty.asJava)
+          .setTopicPartitions(List.empty.asJava)
+      ).build()
+
+      // Send the request until receiving a successful response. There is a 
delay
+      // here because the group coordinator is loaded in the background.
+      consumerGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+      }, msg = s"Did not receive the expected successful response. Last 
response $consumerGroupHeartbeatResponse.")
+    } finally {
+      admin.close()
+    }
+  }
+
   @ClusterTest
   def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
     val admin = cluster.admin()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e1fa86ec907..6aabc76ec25 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1172,10 +1172,11 @@ public class GroupMetadataManager {
             if (request.topicPartitions() == null || 
!request.topicPartitions().isEmpty()) {
                 throw new InvalidRequestException("TopicPartitions must be 
empty when (re-)joining.");
             }
-            boolean hasSubscribedTopicNames = request.subscribedTopicNames() 
!= null && !request.subscribedTopicNames().isEmpty();
-            boolean hasSubscribedTopicRegex = request.subscribedTopicRegex() 
!= null && !request.subscribedTopicRegex().isEmpty();
-            if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) {
-                throw new InvalidRequestException("SubscribedTopicNames or 
SubscribedTopicRegex must be set in first request.");
+            // We accept members joining with an empty list of names or an 
empty regex. It basically
+            // means that they are not subscribed to any topics, but they are 
part of the group.
+            if (request.subscribedTopicNames() == null && 
request.subscribedTopicRegex() == null) {
+                throw new InvalidRequestException("Either SubscribedTopicNames 
or SubscribedTopicRegex must" +
+                    " be non-null when (re-)joining.");
             }
         } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
             throwIfNull(request.instanceId(), "InstanceId can't be null.");
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4cddd7db91e..98fa8452c88 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -207,7 +207,7 @@ public class GroupMetadataManagerTest {
                 .setRebalanceTimeoutMs(5000)));
         assertEquals("TopicPartitions must be empty when (re-)joining.", 
ex.getMessage());
 
-        // SubscribedTopicNames or SubscribedTopicRegex must be present and 
non-empty in the first request (epoch == 0).
+        // SubscribedTopicNames or SubscribedTopicRegex must be present in the 
first request (epoch == 0).
         ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setMemberId(memberId)
@@ -215,7 +215,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(0)
                 .setRebalanceTimeoutMs(5000)
                 .setTopicPartitions(Collections.emptyList())));
-        assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set 
in first request.", ex.getMessage());
+        assertEquals("Either SubscribedTopicNames or SubscribedTopicRegex must 
be non-null when (re-)joining.", ex.getMessage());
 
         // InstanceId must be non-empty if provided in all requests.
         ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(

Reply via email to