This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new a01611c6cb1 MINOR: update method name from
`createGroupTombstoneRecords` to `createGroupTombStoneRecordsAndCancelTimers`
(#20949)
a01611c6cb1 is described below
commit a01611c6cb1bc765c204c391624c4506effad54d
Author: Jinhe Zhang <[email protected]>
AuthorDate: Tue Nov 25 08:51:16 2025 -0500
MINOR: update method name from `createGroupTombstoneRecords` to
`createGroupTombStoneRecordsAndCancelTimers` (#20949)
Since now we are canceling timers in the method, we need to update the
method name
Reviewers: Lucas Brutschy <[email protected]>, Chia-Ping Tsai
<[email protected]>, David Jacot <[email protected]>
---
.../java/org/apache/kafka/coordinator/group/Group.java | 8 ++++++++
.../kafka/coordinator/group/GroupCoordinatorShard.java | 2 +-
.../kafka/coordinator/group/GroupMetadataManager.java | 18 +++++++++---------
.../kafka/coordinator/group/streams/StreamsGroup.java | 16 ++++++++++++++++
.../coordinator/group/GroupCoordinatorShardTest.java | 8 ++++----
.../coordinator/group/GroupMetadataManagerTest.java | 8 ++++----
.../coordinator/group/streams/StreamsGroupTest.java | 12 ++++++++++++
7 files changed, 54 insertions(+), 18 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 359c1fddd81..501c048352f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import java.util.Arrays;
import java.util.List;
@@ -160,6 +161,13 @@ public interface Group {
*/
void createGroupTombstoneRecords(List<CoordinatorRecord> records);
+ /**
+ * Cancel any timers associated with the group.
+ *
+ * @param timer The coordinator timer.
+ */
+ default void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer)
{}
+
/**
* @return Whether the group is in Empty state.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 3165d10d8f4..d4c1ba1d3cd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -620,7 +620,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
try {
groupMetadataManager.validateDeleteGroup(groupId);
numDeletedOffsets +=
offsetMetadataManager.deleteAllOffsets(groupId, records);
- groupMetadataManager.createGroupTombstoneRecords(groupId,
records);
+
groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId,
records);
deletedGroups.add(groupId);
resultCollection.add(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 946fe7f0f24..35a5d562034 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8251,12 +8251,12 @@ public class GroupMetadataManager {
* @param groupId The id of the group to be deleted. It has been checked
in {@link GroupMetadataManager#validateDeleteGroup}.
* @param records The record list to populate.
*/
- public void createGroupTombstoneRecords(
+ public void createGroupTombstoneRecordsAndCancelTimers(
String groupId,
List<CoordinatorRecord> records
) {
// At this point, we have already validated the group id, so we know
that the group exists and that no exception will be thrown.
- createGroupTombstoneRecords(group(groupId), records);
+ createGroupTombstoneRecordsAndCancelTimers(group(groupId), records);
}
/**
@@ -8266,12 +8266,12 @@ public class GroupMetadataManager {
* @param group The group to be deleted.
* @param records The record list to populate.
*/
- public void createGroupTombstoneRecords(
+ public void createGroupTombstoneRecordsAndCancelTimers(
Group group,
List<CoordinatorRecord> records
) {
group.createGroupTombstoneRecords(records);
- timer.cancel(streamsInitialRebalanceKey(group.groupId()));
+ group.cancelTimers(timer);
}
/**
@@ -8666,7 +8666,7 @@ public class GroupMetadataManager {
public void maybeDeleteGroup(String groupId, List<CoordinatorRecord>
records) {
Group group = groups.get(groupId);
if (group != null && group.isEmpty()) {
- createGroupTombstoneRecords(groupId, records);
+ createGroupTombstoneRecordsAndCancelTimers(group, records);
}
}
@@ -8703,7 +8703,7 @@ public class GroupMetadataManager {
if (isEmptyClassicGroup(group)) {
// Delete the classic group by adding tombstones.
// There's no need to remove the group as the replay of tombstones
removes it.
- createGroupTombstoneRecords(group, records);
+ createGroupTombstoneRecordsAndCancelTimers(group, records);
return true;
}
return false;
@@ -8722,7 +8722,7 @@ public class GroupMetadataManager {
if (isEmptyConsumerGroup(group)) {
// Add tombstones for the previous consumer group. The tombstones
won't actually be
// replayed because its coordinator result has a non-null
appendFuture.
- createGroupTombstoneRecords(group, records);
+ createGroupTombstoneRecordsAndCancelTimers(group, records);
removeGroup(groupId);
return true;
}
@@ -8742,7 +8742,7 @@ public class GroupMetadataManager {
if (isEmptyStreamsGroup(group)) {
// Add tombstones for the previous streams group. The tombstones
won't actually be
// replayed because its coordinator result has a non-null
appendFuture.
- createGroupTombstoneRecords(group, records);
+ createGroupTombstoneRecordsAndCancelTimers(group, records);
removeGroup(groupId);
return true;
}
@@ -8908,7 +8908,7 @@ public class GroupMetadataManager {
* @return the initial rebalance key.
*/
static String streamsInitialRebalanceKey(String groupId) {
- return "initial-rebalance-timeout-" + groupId;
+ return StreamsGroup.initialRebalanceTimeoutKey(groupId);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 5b8fa258cb3..82dd871b3cd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
@@ -823,6 +824,21 @@ public class StreamsGroup implements Group {
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
}
+ /**
+ * Generate an initial rebalance key for the timer.
+ *
+ * @param groupId The group id.
+ * @return The initial rebalance key.
+ */
+ public static String initialRebalanceTimeoutKey(String groupId) {
+ return "initial-rebalance-timeout-" + groupId;
+ }
+
+ @Override
+ public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ timer.cancel(initialRebalanceTimeoutKey(groupId));
+ }
+
@Override
public boolean isEmpty() {
return state() == StreamsGroupState.EMPTY;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 145b7613f1f..4dc0c8ff78c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -306,14 +306,14 @@ public class GroupCoordinatorShardTest {
List<CoordinatorRecord> records = invocation.getArgument(1);
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
return null;
- }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(),
anyList());
+
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(),
anyList());
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
CoordinatorRecord> coordinatorResult =
coordinator.deleteGroups(context, groupIds);
for (String groupId : groupIds) {
verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
- verify(groupMetadataManager,
times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList());
+ verify(groupMetadataManager,
times(1)).createGroupTombstoneRecordsAndCancelTimers(ArgumentMatchers.eq(groupId),
anyList());
verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
}
assertEquals(expectedResult, coordinatorResult);
@@ -372,7 +372,7 @@ public class GroupCoordinatorShardTest {
List<CoordinatorRecord> records = invocation.getArgument(1);
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
return null;
- }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(),
anyList());
+
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(),
anyList());
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
CoordinatorRecord> coordinatorResult =
coordinator.deleteGroups(context, groupIds);
@@ -380,7 +380,7 @@ public class GroupCoordinatorShardTest {
for (String groupId : groupIds) {
verify(groupMetadataManager,
times(1)).validateDeleteGroup(eq(groupId));
if (!groupId.equals("group-id-2")) {
- verify(groupMetadataManager,
times(1)).createGroupTombstoneRecords(eq(groupId), anyList());
+ verify(groupMetadataManager,
times(1)).createGroupTombstoneRecordsAndCancelTimers(eq(groupId), anyList());
verify(offsetMetadataManager,
times(1)).deleteAllOffsets(eq(groupId), anyList());
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef90ba0b631..ef81022dcee 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -10460,7 +10460,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords =
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
List<CoordinatorRecord> records = new ArrayList<>();
- context.groupMetadataManager.createGroupTombstoneRecords("group-id",
records);
+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id",
records);
assertEquals(expectedRecords, records);
}
@@ -10498,7 +10498,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
);
List<CoordinatorRecord> records = new ArrayList<>();
- context.groupMetadataManager.createGroupTombstoneRecords("group-id",
records);
+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id",
records);
assertEquals(expectedRecords, records);
}
@@ -10540,7 +10540,7 @@ public class GroupMetadataManagerTest {
assertTrue(context.timer.isScheduled(timerKey), "Timer should be
scheduled after first member joins");
List<CoordinatorRecord> records = new ArrayList<>();
- context.groupMetadataManager.createGroupTombstoneRecords(groupId,
records);
+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId,
records);
assertFalse(context.timer.isScheduled(timerKey), "Timer should be
cancelled after group deletion");
@@ -15952,7 +15952,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
);
List<CoordinatorRecord> records = new ArrayList<>();
-
context.groupMetadataManager.createGroupTombstoneRecords("share-group-id",
records);
+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("share-group-id",
records);
assertEquals(expectedRecords, records);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index de1bf2d82c8..ced7d7ee4e7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
@@ -77,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class StreamsGroupTest {
@@ -1228,4 +1230,14 @@ public class StreamsGroupTest {
assertEquals("sub-1",
describedGroup.topology().subtopologies().get(0).subtopologyId());
assertEquals(List.of("fallback-topic"),
describedGroup.topology().subtopologies().get(0).sourceTopics());
}
+
+ @Test
+ public void testCancelTimers() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ CoordinatorTimer<Void, CoordinatorRecord> timer =
mock(CoordinatorTimer.class);
+
+ streamsGroup.cancelTimers(timer);
+
+ verify(timer).cancel("initial-rebalance-timeout-test-group");
+ }
}