This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 275b995bf2c KAFKA-18095; Allow a member to join without subscription
under new consumer protocol (#18003)
275b995bf2c is described below
commit 275b995bf2c8a26977ea72fa2b4831b930276654
Author: David Jacot <[email protected]>
AuthorDate: Tue Dec 3 11:11:36 2024 +0100
KAFKA-18095; Allow a member to join without subscription under new
consumer protocol (#18003)
This patch relaxes requiring non-empty subscribed names and regex in the
full heartbeat request. Without this, a consumer using client side regexes may
not be able to join the group when the regex does not match any topics yet and
this is inconsistent with the old protocol. Relaxing the subscribed regex is
not strictly required but it seems better to keep it consistent.
Reviewers: Lianet Magrans <[email protected]>
---
.../server/ConsumerGroupHeartbeatRequestTest.scala | 55 ++++++++++++++++++++++
.../coordinator/group/GroupMetadataManager.java | 9 ++--
.../group/GroupMetadataManagerTest.java | 4 +-
3 files changed, 62 insertions(+), 6 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 59d4e05a01f..e597d1cf7c2 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -276,6 +276,61 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}
}
+ @ClusterTest
+ def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
+ val admin = cluster.admin()
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("")
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a
delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Did not receive the expected successful response. Last
response $consumerGroupHeartbeatResponse.")
+
+ // Heartbeat request to join the group.
+ consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List.empty.asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a
delay
+ // here because the group coordinator is loaded in the background.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Did not receive the expected successful response. Last
response $consumerGroupHeartbeatResponse.")
+ } finally {
+ admin.close()
+ }
+ }
+
@ClusterTest
def
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled():
Unit = {
val admin = cluster.admin()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e1fa86ec907..6aabc76ec25 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1172,10 +1172,11 @@ public class GroupMetadataManager {
if (request.topicPartitions() == null ||
!request.topicPartitions().isEmpty()) {
throw new InvalidRequestException("TopicPartitions must be
empty when (re-)joining.");
}
- boolean hasSubscribedTopicNames = request.subscribedTopicNames()
!= null && !request.subscribedTopicNames().isEmpty();
- boolean hasSubscribedTopicRegex = request.subscribedTopicRegex()
!= null && !request.subscribedTopicRegex().isEmpty();
- if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) {
- throw new InvalidRequestException("SubscribedTopicNames or
SubscribedTopicRegex must be set in first request.");
+ // We accept members joining with an empty list of names or an
empty regex. It basically
+ // means that they are not subscribed to any topics, but they are
part of the group.
+ if (request.subscribedTopicNames() == null &&
request.subscribedTopicRegex() == null) {
+ throw new InvalidRequestException("Either SubscribedTopicNames
or SubscribedTopicRegex must" +
+ " be non-null when (re-)joining.");
}
} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
throwIfNull(request.instanceId(), "InstanceId can't be null.");
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4cddd7db91e..98fa8452c88 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -207,7 +207,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(5000)));
assertEquals("TopicPartitions must be empty when (re-)joining.",
ex.getMessage());
- // SubscribedTopicNames or SubscribedTopicRegex must be present and
non-empty in the first request (epoch == 0).
+ // SubscribedTopicNames or SubscribedTopicRegex must be present in the
first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
@@ -215,7 +215,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setTopicPartitions(Collections.emptyList())));
- assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set
in first request.", ex.getMessage());
+ assertEquals("Either SubscribedTopicNames or SubscribedTopicRegex must
be non-null when (re-)joining.", ex.getMessage());
// InstanceId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(