This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 437337cf370 MINOR: update method name from 
`createGroupTombstoneRecords` to `createGroupTombStoneRecordsAndCancelTimers` 
(#20949)
437337cf370 is described below

commit 437337cf370b5204e7ddff3da4550730c1334617
Author: Jinhe Zhang <[email protected]>
AuthorDate: Tue Nov 25 08:51:16 2025 -0500

    MINOR: update method name from `createGroupTombstoneRecords` to 
`createGroupTombStoneRecordsAndCancelTimers` (#20949)
    
    Since now we are canceling timers in the method, we need to update the
    method name
    
    Reviewers: Lucas Brutschy <[email protected]>, Chia-Ping Tsai
     <[email protected]>, David Jacot <[email protected]>
---
 .../java/org/apache/kafka/coordinator/group/Group.java |  8 ++++++++
 .../kafka/coordinator/group/GroupCoordinatorShard.java |  2 +-
 .../kafka/coordinator/group/GroupMetadataManager.java  | 18 +++++++++---------
 .../kafka/coordinator/group/streams/StreamsGroup.java  | 16 ++++++++++++++++
 .../coordinator/group/GroupCoordinatorShardTest.java   |  8 ++++----
 .../coordinator/group/GroupMetadataManagerTest.java    |  8 ++++----
 .../coordinator/group/streams/StreamsGroupTest.java    | 12 ++++++++++++
 7 files changed, 54 insertions(+), 18 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 359c1fddd81..501c048352f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
 
 import java.util.Arrays;
 import java.util.List;
@@ -160,6 +161,13 @@ public interface Group {
      */
     void createGroupTombstoneRecords(List<CoordinatorRecord> records);
 
+    /**
+     * Cancel any timers associated with the group.
+     *
+     * @param timer The coordinator timer.
+     */
+    default void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) 
{}
+
     /**
      * @return Whether the group is in Empty state.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 3165d10d8f4..d4c1ba1d3cd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -620,7 +620,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             try {
                 groupMetadataManager.validateDeleteGroup(groupId);
                 numDeletedOffsets += 
offsetMetadataManager.deleteAllOffsets(groupId, records);
-                groupMetadataManager.createGroupTombstoneRecords(groupId, 
records);
+                
groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId, 
records);
                 deletedGroups.add(groupId);
 
                 resultCollection.add(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 946fe7f0f24..35a5d562034 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8251,12 +8251,12 @@ public class GroupMetadataManager {
      * @param groupId The id of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.
      * @param records The record list to populate.
      */
-    public void createGroupTombstoneRecords(
+    public void createGroupTombstoneRecordsAndCancelTimers(
         String groupId,
         List<CoordinatorRecord> records
     ) {
         // At this point, we have already validated the group id, so we know 
that the group exists and that no exception will be thrown.
-        createGroupTombstoneRecords(group(groupId), records);
+        createGroupTombstoneRecordsAndCancelTimers(group(groupId), records);
     }
 
     /**
@@ -8266,12 +8266,12 @@ public class GroupMetadataManager {
      * @param group The group to be deleted.
      * @param records The record list to populate.
      */
-    public void createGroupTombstoneRecords(
+    public void createGroupTombstoneRecordsAndCancelTimers(
         Group group,
         List<CoordinatorRecord> records
     ) {
         group.createGroupTombstoneRecords(records);
-        timer.cancel(streamsInitialRebalanceKey(group.groupId()));
+        group.cancelTimers(timer);
     }
 
     /**
@@ -8666,7 +8666,7 @@ public class GroupMetadataManager {
     public void maybeDeleteGroup(String groupId, List<CoordinatorRecord> 
records) {
         Group group = groups.get(groupId);
         if (group != null && group.isEmpty()) {
-            createGroupTombstoneRecords(groupId, records);
+            createGroupTombstoneRecordsAndCancelTimers(group, records);
         }
     }
 
@@ -8703,7 +8703,7 @@ public class GroupMetadataManager {
         if (isEmptyClassicGroup(group)) {
             // Delete the classic group by adding tombstones.
             // There's no need to remove the group as the replay of tombstones 
removes it.
-            createGroupTombstoneRecords(group, records);
+            createGroupTombstoneRecordsAndCancelTimers(group, records);
             return true;
         }
         return false;
@@ -8722,7 +8722,7 @@ public class GroupMetadataManager {
         if (isEmptyConsumerGroup(group)) {
             // Add tombstones for the previous consumer group. The tombstones 
won't actually be
             // replayed because its coordinator result has a non-null 
appendFuture.
-            createGroupTombstoneRecords(group, records);
+            createGroupTombstoneRecordsAndCancelTimers(group, records);
             removeGroup(groupId);
             return true;
         }
@@ -8742,7 +8742,7 @@ public class GroupMetadataManager {
         if (isEmptyStreamsGroup(group)) {
             // Add tombstones for the previous streams group. The tombstones 
won't actually be
             // replayed because its coordinator result has a non-null 
appendFuture.
-            createGroupTombstoneRecords(group, records);
+            createGroupTombstoneRecordsAndCancelTimers(group, records);
             removeGroup(groupId);
             return true;
         }
@@ -8908,7 +8908,7 @@ public class GroupMetadataManager {
      * @return the initial rebalance key.
      */
     static String streamsInitialRebalanceKey(String groupId) {
-        return "initial-rebalance-timeout-" + groupId;
+        return StreamsGroup.initialRebalanceTimeoutKey(groupId);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 5b8fa258cb3..82dd871b3cd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
 import org.apache.kafka.coordinator.group.CommitPartitionValidator;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
@@ -823,6 +824,21 @@ public class StreamsGroup implements Group {
         
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
     }
 
+    /**
+     * Generate an initial rebalance key for the timer.
+     *
+     * @param groupId The group id.
+     * @return The initial rebalance key.
+     */
+    public static String initialRebalanceTimeoutKey(String groupId) {
+        return "initial-rebalance-timeout-" + groupId;
+    }
+
+    @Override
+    public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+        timer.cancel(initialRebalanceTimeoutKey(groupId));
+    }
+
     @Override
     public boolean isEmpty() {
         return state() == StreamsGroupState.EMPTY;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 145b7613f1f..4dc0c8ff78c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -306,14 +306,14 @@ public class GroupCoordinatorShardTest {
             List<CoordinatorRecord> records = invocation.getArgument(1);
             
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
             return null;
-        }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), 
anyList());
+        
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(),
 anyList());
 
         
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
CoordinatorRecord> coordinatorResult =
             coordinator.deleteGroups(context, groupIds);
 
         for (String groupId : groupIds) {
             verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
-            verify(groupMetadataManager, 
times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList());
+            verify(groupMetadataManager, 
times(1)).createGroupTombstoneRecordsAndCancelTimers(ArgumentMatchers.eq(groupId),
 anyList());
             verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
         }
         assertEquals(expectedResult, coordinatorResult);
@@ -372,7 +372,7 @@ public class GroupCoordinatorShardTest {
             List<CoordinatorRecord> records = invocation.getArgument(1);
             
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
             return null;
-        }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), 
anyList());
+        
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(),
 anyList());
 
         
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
CoordinatorRecord> coordinatorResult =
             coordinator.deleteGroups(context, groupIds);
@@ -380,7 +380,7 @@ public class GroupCoordinatorShardTest {
         for (String groupId : groupIds) {
             verify(groupMetadataManager, 
times(1)).validateDeleteGroup(eq(groupId));
             if (!groupId.equals("group-id-2")) {
-                verify(groupMetadataManager, 
times(1)).createGroupTombstoneRecords(eq(groupId), anyList());
+                verify(groupMetadataManager, 
times(1)).createGroupTombstoneRecordsAndCancelTimers(eq(groupId), anyList());
                 verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(eq(groupId), anyList());
             }
         }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef90ba0b631..ef81022dcee 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -10460,7 +10460,7 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = 
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
         List<CoordinatorRecord> records = new ArrayList<>();
-        context.groupMetadataManager.createGroupTombstoneRecords("group-id", 
records);
+        
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id",
 records);
         assertEquals(expectedRecords, records);
     }
 
@@ -10498,7 +10498,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
         );
         List<CoordinatorRecord> records = new ArrayList<>();
-        context.groupMetadataManager.createGroupTombstoneRecords("group-id", 
records);
+        
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id",
 records);
         assertEquals(expectedRecords, records);
     }
 
@@ -10540,7 +10540,7 @@ public class GroupMetadataManagerTest {
         assertTrue(context.timer.isScheduled(timerKey), "Timer should be 
scheduled after first member joins");
 
         List<CoordinatorRecord> records = new ArrayList<>();
-        context.groupMetadataManager.createGroupTombstoneRecords(groupId, 
records);
+        
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId,
 records);
 
         assertFalse(context.timer.isScheduled(timerKey), "Timer should be 
cancelled after group deletion");
 
@@ -15952,7 +15952,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
         );
         List<CoordinatorRecord> records = new ArrayList<>();
-        
context.groupMetadataManager.createGroupTombstoneRecords("share-group-id", 
records);
+        
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("share-group-id",
 records);
         assertEquals(expectedRecords, records);
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index de1bf2d82c8..ced7d7ee4e7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.CommitPartitionValidator;
@@ -77,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class StreamsGroupTest {
@@ -1228,4 +1230,14 @@ public class StreamsGroupTest {
         assertEquals("sub-1", 
describedGroup.topology().subtopologies().get(0).subtopologyId());
         assertEquals(List.of("fallback-topic"), 
describedGroup.topology().subtopologies().get(0).sourceTopics());
     }
+
+    @Test
+    public void testCancelTimers() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        CoordinatorTimer<Void, CoordinatorRecord> timer = 
mock(CoordinatorTimer.class);
+
+        streamsGroup.cancelTimers(timer);
+
+        verify(timer).cancel("initial-rebalance-timeout-test-group");
+    }
 }

Reply via email to