This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8864cba0e85 MINOR: Update full request condition in
ConsumerGroupHeartbeat request handling (#18061)
8864cba0e85 is described below
commit 8864cba0e85ed076d21d1632847a40269f6aacf3
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 6 08:05:22 2024 +0100
MINOR: Update full request condition in ConsumerGroupHeartbeat request
handling (#18061)
With the addition of the SubscribedTopicRegex field to the
ConsumerGroupHeartbeat request, we need to update the definition of a full
request. This patch does so.
Reviewers: Lianet Magrans <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 8 ++--
.../group/GroupMetadataManagerTest.java | 46 +++++++++++++++++-----
2 files changed, 40 insertions(+), 14 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 95fff558d5e..69e7955d0bf 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1768,11 +1768,11 @@ public class GroupMetadataManager {
// The assignment is only provided in the following cases:
// 1. The member sent a full request. It does so when joining or
rejoining the group with zero
// as the member epoch; or on any errors (e.g. timeout). We use all
the non-optional fields
- // (rebalanceTimeoutMs, subscribedTopicNames and
ownedTopicPartitions) to detect a full request
- // as those must be set in a full request.
+ // (rebalanceTimeoutMs, (subscribedTopicNames or
subscribedTopicRegex) and ownedTopicPartitions)
+ // to detect a full request as those must be set in a full request.
// 2. The member's assignment has been updated.
- boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1
&& subscribedTopicNames != null && ownedTopicPartitions != null);
- if (isFullRequest || hasAssignedPartitionsChanged(member,
updatedMember)) {
+ boolean isFullRequest = rebalanceTimeoutMs != -1 &&
(subscribedTopicNames != null || subscribedTopicRegex != null) &&
ownedTopicPartitions != null;
+ if (memberEpoch == 0 || isFullRequest ||
hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createConsumerGroupResponseAssignment(updatedMember));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index d9609e0f7a5..f6391b5bd61 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1967,7 +1967,7 @@ public class GroupMetadataManagerTest {
);
// A full response should be sent back when the member sends
- // a full request again.
+ // a full request again with topic names set.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
@@ -1990,6 +1990,31 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))),
result.response()
);
+
+ // A full response should be sent back when the member sends
+ // a full request again with regex set.
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(result.response().memberEpoch())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo.*")
+ .setServerAssignor("range")
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result.response()
+ );
}
@Test
@@ -15310,7 +15335,14 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(10)
- .setHeartbeatIntervalMs(5000),
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ ),
result.response()
);
@@ -15442,10 +15474,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicRegex("foo*|bar*")
- .setServerAssignor("range")
- .setTopicPartitions(Collections.emptyList()));
+ .setSubscribedTopicRegex("foo*|bar*"));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
@@ -15494,10 +15523,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicRegex("foo*|bar*")
- .setServerAssignor("range")
- .setTopicPartitions(Collections.emptyList()));
+ .setSubscribedTopicRegex("foo*|bar*"));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()