This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65a3bd26da5 KAFKA-19926: Apply rebalance delay also for later epochs
if the group is empty (#21011)
65a3bd26da5 is described below
commit 65a3bd26da58cea7037c6754f987bc6f1948f887
Author: Jinhe Zhang <[email protected]>
AuthorDate: Fri Nov 28 12:14:08 2025 -0500
KAFKA-19926: Apply rebalance delay also for later epochs if the group is
empty (#21011)
In https://github.com/apache/kafka/pull/20755 we introduce a initial
rebalance delay for streams group, now we would like to expand it to all
rebalance when group is empty to avoid frequent rebalance in a small
time period
Reviewers: Lucas Brutschy <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 7 ++-
.../group/GroupMetadataManagerTest.java | 59 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 4 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 35a5d562034..25873f41219 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2043,8 +2043,7 @@ public class GroupMetadataManager {
}
// Schedule initial rebalance delay for new streams groups to coalesce
joins.
- boolean isInitialRebalance = (group.groupEpoch() == 0);
- if (isInitialRebalance) {
+ if (group.isEmpty()) {
int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
if (initialDelayMs > 0) {
timer.scheduleIfAbsent(
@@ -2064,9 +2063,9 @@ public class GroupMetadataManager {
TasksTuple targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
boolean initialDelayActive =
timer.isScheduled(streamsInitialRebalanceKey(groupId));
- if (initialDelayActive && group.assignmentEpoch() == 0) {
+ if (initialDelayActive) {
// During initial rebalance delay, return empty assignment to
first joining members.
- targetAssignmentEpoch = 1;
+ targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
targetAssignment = TasksTuple.EMPTY;
} else {
targetAssignment = updateStreamsTargetAssignment(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef81022dcee..3d5e8e0f4e3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16732,6 +16732,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.withStreamsGroup(
new StreamsGroupBuilder(groupId, 10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
@@ -17536,6 +17537,64 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void
testStreamsRebalanceDelayWhenJoiningEmptyGroupWithNonZeroEpoch() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
1000)
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10))
+ .build();
+
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ assertTrue(group.isEmpty());
+ assertEquals(10, group.groupEpoch());
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result;
+
+ result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(10000)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ int memberEpoch = result.response().data().memberEpoch();
+ assertTrue(result.response().data().activeTasks().isEmpty());
+
+ context.sleep(2000);
+
+ result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertFalse(result.response().data().activeTasks().isEmpty());
+ }
+
@Test
public void testStreamsReconciliationProcess() {
String groupId = "fooup";