This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65a3bd26da5 KAFKA-19926: Apply rebalance delay also for later epochs 
if the group is empty (#21011)
65a3bd26da5 is described below

commit 65a3bd26da58cea7037c6754f987bc6f1948f887
Author: Jinhe Zhang <[email protected]>
AuthorDate: Fri Nov 28 12:14:08 2025 -0500

    KAFKA-19926: Apply rebalance delay also for later epochs if the group is 
empty (#21011)
    
    In https://github.com/apache/kafka/pull/20755 we introduce a initial
    rebalance delay for streams group, now we would like to expand it to all
    rebalance when group is empty to avoid frequent rebalance in a small
    time period
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  7 ++-
 .../group/GroupMetadataManagerTest.java            | 59 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 4 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 35a5d562034..25873f41219 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2043,8 +2043,7 @@ public class GroupMetadataManager {
         }
 
         // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
-        boolean isInitialRebalance = (group.groupEpoch() == 0);
-        if (isInitialRebalance) {
+        if (group.isEmpty()) {
             int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
             if (initialDelayMs > 0) {
                 timer.scheduleIfAbsent(
@@ -2064,9 +2063,9 @@ public class GroupMetadataManager {
         TasksTuple targetAssignment;
         if (groupEpoch > group.assignmentEpoch()) {
             boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
-            if (initialDelayActive && group.assignmentEpoch() == 0) {
+            if (initialDelayActive) {
                 // During initial rebalance delay, return empty assignment to 
first joining members.
-                targetAssignmentEpoch = 1;
+                targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
                 targetAssignment = TasksTuple.EMPTY;
             } else {
                 targetAssignment = updateStreamsTargetAssignment(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef81022dcee..3d5e8e0f4e3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16732,6 +16732,7 @@ public class GroupMetadataManagerTest {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
             .withMetadataImage(metadataImage)
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 0)
             .withStreamsGroup(
                 new StreamsGroupBuilder(groupId, 10)
                     
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
@@ -17536,6 +17537,64 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void 
testStreamsRebalanceDelayWhenJoiningEmptyGroupWithNonZeroEpoch() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .buildCoordinatorMetadataImage())
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 1000)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10))
+            .build();
+
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertTrue(group.isEmpty());
+        assertEquals(10, group.groupEpoch());
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result;
+
+        result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(10000)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        int memberEpoch = result.response().data().memberEpoch();
+        assertTrue(result.response().data().activeTasks().isEmpty());
+
+        context.sleep(2000);
+
+        result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(memberEpoch)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertFalse(result.response().data().activeTasks().isEmpty());
+    }
+
     @Test
     public void testStreamsReconciliationProcess() {
         String groupId = "fooup";

Reply via email to