This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 849bf6c7a18 MINOR: Remove unused type parameter T from 
CoordinatorTimer (#21360)
849bf6c7a18 is described below

commit 849bf6c7a18cb4969f2674150386274e18bef913
Author: David Jacot <[email protected]>
AuthorDate: Mon Jan 26 18:19:48 2026 +0100

    MINOR: Remove unused type parameter T from CoordinatorTimer (#21360)
    
    The type parameter T in CoordinatorTimer<T, U> was always Void in all
    usages. This simplifies the interface to CoordinatorTimer<U> and updates
    all references across coordinator-common, group-coordinator, and
    share-coordinator modules.
    
    Reviewers: Sean Quah <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../common/runtime/CoordinatorShardBuilder.java    |  2 +-
 .../common/runtime/CoordinatorTimer.java           | 16 ++--
 .../common/runtime/CoordinatorTimerImpl.java       |  8 +-
 .../common/runtime/MockCoordinatorShard.java       |  8 +-
 .../runtime/MockCoordinatorShardBuilder.java       |  4 +-
 .../common/runtime/MockCoordinatorTimer.java       | 28 +++----
 .../org/apache/kafka/coordinator/group/Group.java  |  2 +-
 .../coordinator/group/GroupCoordinatorShard.java   |  8 +-
 .../coordinator/group/GroupMetadataManager.java    |  8 +-
 .../coordinator/group/streams/StreamsGroup.java    |  2 +-
 .../group/GroupCoordinatorShardTest.java           | 12 +--
 .../group/GroupMetadataManagerTest.java            | 94 +++++++++++-----------
 .../group/GroupMetadataManagerTestContext.java     | 38 ++++-----
 .../group/OffsetMetadataManagerTest.java           | 12 +--
 .../group/streams/StreamsGroupTest.java            |  2 +-
 .../coordinator/share/ShareCoordinatorShard.java   |  2 +-
 16 files changed, 124 insertions(+), 122 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
index 62092999c39..83583fb302c 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
@@ -72,7 +72,7 @@ public interface CoordinatorShardBuilder<S extends 
CoordinatorShard<U>, U> {
      * @return The builder.
      */
     CoordinatorShardBuilder<S, U> withTimer(
-        CoordinatorTimer<Void, U> timer
+        CoordinatorTimer<U> timer
     );
 
     /**
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
index 2f288df3026..17d9e10ecdd 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
@@ -22,8 +22,10 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * An interface to schedule and cancel operations.
+ *
+ * @param <U> The record type.
  */
-public interface CoordinatorTimer<T, U> {
+public interface CoordinatorTimer<U> {
     /**
      * Generates the records needed to implement this timeout write operation. 
In general,
      * this operation should not modify the hard state of the coordinator. 
That modifications
@@ -33,10 +35,10 @@ public interface CoordinatorTimer<T, U> {
      * A typical example of timeout operation is the session timeout used by 
the consumer
      * group rebalance protocol.
      *
-     * @param <T> The record type.
+     * @param <U> The record type.
      */
-    interface TimeoutOperation<T, U> {
-        CoordinatorResult<T, U> generateRecords() throws KafkaException;
+    interface TimeoutOperation<U> {
+        CoordinatorResult<Void, U> generateRecords() throws KafkaException;
     }
 
     /**
@@ -50,7 +52,7 @@ public interface CoordinatorTimer<T, U> {
      *                    be retried on failure.
      * @param operation   The operation to perform upon expiration.
      */
-    void schedule(String key, long delay, TimeUnit unit, boolean retry, 
TimeoutOperation<T, U> operation);
+    void schedule(String key, long delay, TimeUnit unit, boolean retry, 
TimeoutOperation<U> operation);
 
     /**
      * Add an operation to the timer. If an operation with the same key
@@ -64,7 +66,7 @@ public interface CoordinatorTimer<T, U> {
      * @param retryBackoff  The delay when rescheduled on retry. The same unit 
is used.
      * @param operation     The operation to perform upon expiration.
      */
-    void schedule(String key, long delay, TimeUnit unit, boolean retry, long 
retryBackoff, TimeoutOperation<T, U> operation);
+    void schedule(String key, long delay, TimeUnit unit, boolean retry, long 
retryBackoff, TimeoutOperation<U> operation);
 
     /**
      * Add an operation to the timer if there's no operation with the same key.
@@ -76,7 +78,7 @@ public interface CoordinatorTimer<T, U> {
      *                    be retried on failure.
      * @param operation   The operation to perform upon expiration.
      */
-    void scheduleIfAbsent(String key, long delay, TimeUnit unit, boolean 
retry, TimeoutOperation<T, U> operation);
+    void scheduleIfAbsent(String key, long delay, TimeUnit unit, boolean 
retry, TimeoutOperation<U> operation);
 
     /**
      * Remove an operation corresponding to a given key.
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
index 0c66c99c25f..b99e396da77 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
  *
  * When a timer fails with an unexpected exception, the timer is rescheduled 
with a backoff.
  */
-public class CoordinatorTimerImpl<U> implements CoordinatorTimer<Void, U> {
+public class CoordinatorTimerImpl<U> implements CoordinatorTimer<U> {
     private final Logger log;
     private final Timer timer;
     private final CoordinatorShardScheduler<U> scheduler;
@@ -60,7 +60,7 @@ public class CoordinatorTimerImpl<U> implements 
CoordinatorTimer<Void, U> {
         long delay,
         TimeUnit unit,
         boolean retry,
-        TimeoutOperation<Void, U> operation
+        TimeoutOperation<U> operation
     ) {
         schedule(key, delay, unit, retry, 500, operation);
     }
@@ -72,7 +72,7 @@ public class CoordinatorTimerImpl<U> implements 
CoordinatorTimer<Void, U> {
         TimeUnit unit,
         boolean retry,
         long retryBackoff,
-        TimeoutOperation<Void, U> operation
+        TimeoutOperation<U> operation
     ) {
         // The TimerTask wraps the TimeoutOperation into a write operation. 
When the TimerTask
         // expires, the operation is scheduled through the scheduler to be 
executed. This
@@ -141,7 +141,7 @@ public class CoordinatorTimerImpl<U> implements 
CoordinatorTimer<Void, U> {
         long delay,
         TimeUnit unit,
         boolean retry,
-        TimeoutOperation<Void, U> operation
+        TimeoutOperation<U> operation
     ) {
         if (!tasks.containsKey(key)) {
             schedule(key, delay, unit, retry, 500, operation);
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
index 28167504f57..7214b869ca8 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
@@ -53,19 +53,19 @@ public class MockCoordinatorShard implements 
CoordinatorShard<String> {
     private final SnapshotRegistry snapshotRegistry;
     private final TimelineHashSet<RecordAndMetadata> records;
     private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> 
pendingRecords;
-    private final CoordinatorTimer<Void, String> timer;
+    private final CoordinatorTimer<String> timer;
     private final CoordinatorExecutor<String> executor;
 
     MockCoordinatorShard(
         SnapshotRegistry snapshotRegistry,
-        CoordinatorTimer<Void, String> timer
+        CoordinatorTimer<String> timer
     ) {
         this(snapshotRegistry, timer, null);
     }
 
     MockCoordinatorShard(
         SnapshotRegistry snapshotRegistry,
-        CoordinatorTimer<Void, String> timer,
+        CoordinatorTimer<String> timer,
         CoordinatorExecutor<String> executor
     ) {
         this.snapshotRegistry = snapshotRegistry;
@@ -130,7 +130,7 @@ public class MockCoordinatorShard implements 
CoordinatorShard<String> {
             .toList();
     }
 
-    CoordinatorTimer<Void, String> timer() {
+    CoordinatorTimer<String> timer() {
         return timer;
     }
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
index dea2dcc1c17..13b0c3b13d1 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
@@ -28,7 +28,7 @@ import java.util.Objects;
  */
 public class MockCoordinatorShardBuilder implements 
CoordinatorShardBuilder<MockCoordinatorShard, String> {
     private SnapshotRegistry snapshotRegistry;
-    private CoordinatorTimer<Void, String> timer;
+    private CoordinatorTimer<String> timer;
     private CoordinatorExecutor<String> executor;
 
     @Override
@@ -63,7 +63,7 @@ public class MockCoordinatorShardBuilder implements 
CoordinatorShardBuilder<Mock
 
     @Override
     public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
-        CoordinatorTimer<Void, String> timer
+        CoordinatorTimer<String> timer
     ) {
         this.timer = timer;
         return this;
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
index 69e3954a0a6..aa09d0face5 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
@@ -31,23 +31,23 @@ import java.util.concurrent.TimeUnit;
  * expire timeouts. They are only expired when {@link 
MockCoordinatorTimer#poll()}
  * is called.
  */
-public class MockCoordinatorTimer<T, U> implements CoordinatorTimer<T, U> {
+public class MockCoordinatorTimer<U> implements CoordinatorTimer<U> {
     /**
      * Represents a scheduled timeout.
      */
-    public record ScheduledTimeout<T, U>(String key, long deadlineMs, 
TimeoutOperation<T, U> operation) {
+    public record ScheduledTimeout<U>(String key, long deadlineMs, 
TimeoutOperation<U> operation) {
     }
 
     /**
      * Represents an expired timeout.
      */
-    public record ExpiredTimeout<T, U>(String key, CoordinatorResult<T, U> 
result) {
+    public record ExpiredTimeout<U>(String key, CoordinatorResult<Void, U> 
result) {
     }
 
     private final Time time;
 
-    private final Map<String, ScheduledTimeout<T, U>> timeoutMap = new 
HashMap<>();
-    private final PriorityQueue<ScheduledTimeout<T, U>> timeoutQueue = new 
PriorityQueue<>(
+    private final Map<String, ScheduledTimeout<U>> timeoutMap = new 
HashMap<>();
+    private final PriorityQueue<ScheduledTimeout<U>> timeoutQueue = new 
PriorityQueue<>(
         Comparator.comparingLong(entry -> entry.deadlineMs)
     );
 
@@ -65,12 +65,12 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
         TimeUnit unit,
         boolean retry,
         long retryBackoff,
-        TimeoutOperation<T, U> operation
+        TimeoutOperation<U> operation
     ) {
         cancel(key);
 
         long deadlineMs = time.milliseconds() + unit.toMillis(delay);
-        ScheduledTimeout<T, U> timeout = new ScheduledTimeout<>(key, 
deadlineMs, operation);
+        ScheduledTimeout<U> timeout = new ScheduledTimeout<>(key, deadlineMs, 
operation);
         timeoutQueue.add(timeout);
         timeoutMap.put(key, timeout);
     }
@@ -81,7 +81,7 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
         long delay,
         TimeUnit unit,
         boolean retry,
-        TimeoutOperation<T, U> operation
+        TimeoutOperation<U> operation
     ) {
         schedule(key, delay, unit, retry, 500L, operation);
     }
@@ -92,7 +92,7 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
         long delay,
         TimeUnit unit,
         boolean retry,
-        TimeoutOperation<T, U> operation
+        TimeoutOperation<U> operation
     ) {
         if (!timeoutMap.containsKey(key)) {
             schedule(key, delay, unit, retry, 500L, operation);
@@ -104,7 +104,7 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
      */
     @Override
     public void cancel(String key) {
-        ScheduledTimeout<T, U> timeout = timeoutMap.remove(key);
+        ScheduledTimeout<U> timeout = timeoutMap.remove(key);
         if (timeout != null) {
             timeoutQueue.remove(timeout);
         }
@@ -128,7 +128,7 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
     /**
      * @return The scheduled timeout for the key; null otherwise.
      */
-    public ScheduledTimeout<T, U> timeout(String key) {
+    public ScheduledTimeout<U> timeout(String key) {
         return timeoutMap.get(key);
     }
 
@@ -142,10 +142,10 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
     /**
      * @return A list of expired timeouts based on the current time.
      */
-    public List<ExpiredTimeout<T, U>> poll() {
-        List<ExpiredTimeout<T, U>> results = new ArrayList<>();
+    public List<ExpiredTimeout<U>> poll() {
+        List<ExpiredTimeout<U>> results = new ArrayList<>();
 
-        ScheduledTimeout<T, U> timeout = timeoutQueue.peek();
+        ScheduledTimeout<U> timeout = timeoutQueue.peek();
         while (timeout != null && timeout.deadlineMs <= time.milliseconds()) {
             timeoutQueue.poll();
             timeoutMap.remove(timeout.key, timeout);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 501c048352f..e0bff1d51c4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -166,7 +166,7 @@ public interface Group {
      *
      * @param timer The coordinator timer.
      */
-    default void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) 
{}
+    default void cancelTimers(CoordinatorTimer<CoordinatorRecord> timer) {}
 
     /**
      * @return Whether the group is in Empty state.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 15eecf0fe7f..8937df57f32 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -159,7 +159,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         private LogContext logContext;
         private SnapshotRegistry snapshotRegistry;
         private Time time;
-        private CoordinatorTimer<Void, CoordinatorRecord> timer;
+        private CoordinatorTimer<CoordinatorRecord> timer;
         private CoordinatorExecutor<CoordinatorRecord> executor;
         private CoordinatorMetrics coordinatorMetrics;
         private TopicPartition topicPartition;
@@ -191,7 +191,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         @Override
         public CoordinatorShardBuilder<GroupCoordinatorShard, 
CoordinatorRecord> withTimer(
-            CoordinatorTimer<Void, CoordinatorRecord> timer
+            CoordinatorTimer<CoordinatorRecord> timer
         ) {
             this.timer = timer;
             return this;
@@ -410,7 +410,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     /**
      * The coordinator timer.
      */
-    private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+    private final CoordinatorTimer<CoordinatorRecord> timer;
 
     /**
      * The group coordinator config.
@@ -441,7 +441,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         GroupMetadataManager groupMetadataManager,
         OffsetMetadataManager offsetMetadataManager,
         Time time,
-        CoordinatorTimer<Void, CoordinatorRecord> timer,
+        CoordinatorTimer<CoordinatorRecord> timer,
         GroupCoordinatorConfig config,
         CoordinatorMetrics coordinatorMetrics,
         CoordinatorMetricsShard metricsShard
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 205c9ee2d6e..af80cbea4df 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -281,7 +281,7 @@ public class GroupMetadataManager {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
         private Time time = null;
-        private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
+        private CoordinatorTimer<CoordinatorRecord> timer = null;
         private CoordinatorExecutor<CoordinatorRecord> executor = null;
         private GroupCoordinatorConfig config = null;
         private GroupConfigManager groupConfigManager = null;
@@ -306,7 +306,7 @@ public class GroupMetadataManager {
             return this;
         }
 
-        Builder withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+        Builder withTimer(CoordinatorTimer<CoordinatorRecord> timer) {
             this.timer = timer;
             return this;
         }
@@ -421,7 +421,7 @@ public class GroupMetadataManager {
     /**
      * The system timer.
      */
-    private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+    private final CoordinatorTimer<CoordinatorRecord> timer;
 
     /**
      * The executor to executor asynchronous tasks.
@@ -514,7 +514,7 @@ public class GroupMetadataManager {
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         Time time,
-        CoordinatorTimer<Void, CoordinatorRecord> timer,
+        CoordinatorTimer<CoordinatorRecord> timer,
         CoordinatorExecutor<CoordinatorRecord> executor,
         GroupCoordinatorMetricsShard metrics,
         CoordinatorMetadataImage metadataImage,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 621a8f9d1bd..e97bafee55b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -835,7 +835,7 @@ public class StreamsGroup implements Group {
     }
 
     @Override
-    public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+    public void cancelTimers(CoordinatorTimer<CoordinatorRecord> timer) {
         timer.cancel(initialRebalanceTimeoutKey(groupId));
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 85a1cbc92ab..11367610e7e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1334,7 +1334,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         Time mockTime = new MockTime();
-        MockCoordinatorTimer<Void, CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
+        MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
@@ -1353,7 +1353,7 @@ public class GroupCoordinatorShardTest {
 
         // Confirm that it is rescheduled after completion.
         mockTime.sleep(1000L);
-        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
tasks = timer.poll();
+        List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> tasks = 
timer.poll();
         assertEquals(1, tasks.size());
         assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
 
@@ -1366,7 +1366,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         Time mockTime = new MockTime();
-        MockCoordinatorTimer<Void, CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
+        MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
@@ -1426,7 +1426,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         Time mockTime = new MockTime();
-        MockCoordinatorTimer<Void, CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
+        MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
@@ -1465,7 +1465,7 @@ public class GroupCoordinatorShardTest {
         CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
         CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
         MockTime time = new MockTime();
-        MockCoordinatorTimer<Void, CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(time);
+        MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(time);
         GroupCoordinatorConfig config = mock(GroupCoordinatorConfig.class);
         when(config.offsetsRetentionCheckIntervalMs()).thenReturn(60 * 60 * 
1000L);
 
@@ -1568,7 +1568,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         Time mockTime = new MockTime();
-        MockCoordinatorTimer<Void, CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
+        MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(mockTime);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 922521ec91b..ba2d9199d72 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -4343,11 +4343,11 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId, 45000);
 
         // Advance time past the session timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupSessionTimeoutKey(groupId, memberId),
                 new CoordinatorResult<>(
                     List.of(
@@ -4407,12 +4407,12 @@ public class GroupMetadataManagerTest {
         assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId, 
memberId)));
 
         // Advance time past the session timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
             List.of(
-                new ExpiredTimeout<Void, CoordinatorRecord>(
+                new ExpiredTimeout<CoordinatorRecord>(
                     groupSessionTimeoutKey(groupId, memberId),
                     new CoordinatorResult<>(
                         List.of(
@@ -4469,11 +4469,11 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId, 45000);
 
         // Advance time past the session timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupSessionTimeoutKey(groupId, memberId),
                 new CoordinatorResult<>(
                     List.of(
@@ -4531,12 +4531,12 @@ public class GroupMetadataManagerTest {
         assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId, 
memberId)));
 
         // Advance time past the session timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
             List.of(
-                new ExpiredTimeout<Void, CoordinatorRecord>(
+                new ExpiredTimeout<CoordinatorRecord>(
                     groupSessionTimeoutKey(groupId, memberId),
                     new CoordinatorResult<>(
                         List.of(
@@ -4612,11 +4612,11 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId, 45000);
 
         // Advance time past the session timeout. No static member joined back 
as a replacement
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupSessionTimeoutKey(groupId, memberId),
                 new CoordinatorResult<>(
                     List.of(
@@ -4746,7 +4746,7 @@ public class GroupMetadataManagerTest {
 
         // Verify that there is a revocation timeout. Keep a reference
         // to the timeout for later.
-        ScheduledTimeout<Void, CoordinatorRecord> scheduledTimeout =
+        ScheduledTimeout<CoordinatorRecord> scheduledTimeout =
             context.assertRebalanceTimeout(groupId, memberId1, 12000);
 
         assertEquals(
@@ -4890,11 +4890,11 @@ public class GroupMetadataManagerTest {
         );
 
         // Advance time past the revocation timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(10000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(10000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupRebalanceTimeoutKey(groupId, memberId1),
                 new CoordinatorResult<>(
                     List.of(
@@ -5418,7 +5418,7 @@ public class GroupMetadataManagerTest {
         context.groupMetadataManager.onLoaded();
 
         IntStream.range(0, 2).forEach(i -> {
-            ScheduledTimeout<Void, CoordinatorRecord> timeout = 
context.timer.timeout(
+            ScheduledTimeout<CoordinatorRecord> timeout = 
context.timer.timeout(
                 classicGroupHeartbeatKey("group-id", "member-1"));
 
             assertNotNull(timeout);
@@ -6304,7 +6304,7 @@ public class GroupMetadataManagerTest {
         String memberId = group.leaderOrNull();
         // Advance clock by new member join timeout. Member should be removed 
from group as heartbeat expires.
         // A group that transitions to Empty after completing join phase will 
generate records.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(context.classicGroupNewMemberJoinTimeoutMs);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(context.classicGroupNewMemberJoinTimeoutMs);
 
         assertEquals(1, timeouts.size());
         timeouts.forEach(timeout -> {
@@ -7121,7 +7121,7 @@ public class GroupMetadataManagerTest {
         assertEquals(1, group.numMembers());
 
         // Member should be removed as heartbeat expires. The group is now 
empty.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(5000);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(5000);
         List<CoordinatorRecord> expectedRecords = 
List.of(GroupMetadataManagerTestContext.newGroupMetadataRecord(
             group.groupId(),
             new GroupMetadataValue()
@@ -7370,7 +7370,7 @@ public class GroupMetadataManagerTest {
 
         // Advance clock by rebalance timeout so that the join phase completes 
with duplicate follower.
         // Both heartbeats will expire but only the leader is kicked out.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(10000);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(10000);
         assertEquals(2, timeouts.size());
         timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, 
timeout.result()));
 
@@ -8139,7 +8139,7 @@ public class GroupMetadataManagerTest {
         assertTrue(group.isInState(PREPARING_REBALANCE));
 
         // Advance clock by session timeout to kick leader out and complete 
join phase.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(5000);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(5000);
         // Both leader and follower heartbeat timers may expire. However, the 
follower heartbeat expiration
         // will not kick the follower out because it is awaiting a join 
response.
         assertTrue(timeouts.size() <= 2);
@@ -9002,7 +9002,7 @@ public class GroupMetadataManagerTest {
         // Advance clock by session timeout to expire leader heartbeat and 
prepare rebalance.
         // This should complete follower's sync response. The follower's 
heartbeat expiration will not kick
         // the follower out because it is awaiting sync.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(10000);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(10000);
         assertTrue(timeouts.size() <= 2);
         timeouts.forEach(timeout -> 
assertTrue(timeout.result().records().isEmpty()));
 
@@ -9834,9 +9834,9 @@ public class GroupMetadataManagerTest {
 
         // Advance clock by 1/2 rebalance timeout to expire the pending sync. 
Members should be removed.
         // The group becomes empty, generating an empty group metadata record.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs / 2);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs / 2);
         assertEquals(1, timeouts.size());
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
         assertEquals(classicGroupSyncKey("group-id"), timeout.key());
         assertEquals(
             
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
group.groupAssignment())),
@@ -9888,9 +9888,9 @@ public class GroupMetadataManagerTest {
         context.verifyHeartbeat(group.groupId(), joinResponses.get(0), 
Errors.NONE);
 
         // Advance clock by 1/2 rebalance timeout to expire the pending sync. 
Followers should be removed.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs / 2);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs / 2);
         assertEquals(1, timeouts.size());
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
         assertEquals(classicGroupSyncKey("group-id"), timeout.key());
         assertTrue(timeout.result().records().isEmpty());
 
@@ -9939,9 +9939,9 @@ public class GroupMetadataManagerTest {
             }).toList();
 
         // Advance clock by 1/2 rebalance timeout to expire the pending sync. 
Leader should be kicked out.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs / 2);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs / 2);
         assertEquals(1, timeouts.size());
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
         assertEquals(classicGroupSyncKey("group-id"), timeout.key());
         assertTrue(timeout.result().records().isEmpty());
 
@@ -12885,12 +12885,12 @@ public class GroupMetadataManagerTest {
         );
 
         // The new classic member 1 has a heartbeat timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
             classicGroupHeartbeatKey(groupId, memberId1)
         );
         assertNotNull(heartbeatTimeout);
         // The new rebalance has a groupJoin timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
             classicGroupJoinKey(groupId)
         );
         assertNotNull(groupJoinTimeout);
@@ -13009,7 +13009,7 @@ public class GroupMetadataManagerTest {
 
         // Advance time past the session timeout.
         // Member 2 should be fenced from the group, thus triggering the 
downgrade.
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(45000 
+ 1).get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = context.sleep(45000 + 
1).get(0);
         assertEquals(groupSessionTimeoutKey(groupId, memberId2), 
timeout.key());
 
         byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(List.of(
@@ -13069,12 +13069,12 @@ public class GroupMetadataManagerTest {
         );
 
         // The new classic member 1 has a heartbeat timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
             classicGroupHeartbeatKey(groupId, memberId1)
         );
         assertNotNull(heartbeatTimeout);
         // The new rebalance has a groupJoin timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
             classicGroupJoinKey(groupId)
         );
         assertNotNull(groupJoinTimeout);
@@ -13211,7 +13211,7 @@ public class GroupMetadataManagerTest {
 
         // Advance time past the session timeout.
         // Member 2 should be fenced from the group, thus triggering the 
downgrade.
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(30000 
+ 1).get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = context.sleep(30000 + 
1).get(0);
         assertEquals(groupRebalanceTimeoutKey(groupId, memberId2), 
timeout.key());
 
         byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(List.of(
@@ -13271,12 +13271,12 @@ public class GroupMetadataManagerTest {
         );
 
         // The new classic member 1 has a heartbeat timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
             classicGroupHeartbeatKey(groupId, memberId1)
         );
         assertNotNull(heartbeatTimeout);
         // The new rebalance has a groupJoin timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
             classicGroupJoinKey(groupId)
         );
         assertNotNull(groupJoinTimeout);
@@ -13534,7 +13534,7 @@ public class GroupMetadataManagerTest {
         );
 
         // The new classic member 1 has a heartbeat timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
             classicGroupHeartbeatKey(groupId, memberId1)
         );
         assertNotNull(heartbeatTimeout);
@@ -15411,11 +15411,11 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId, sessionTimeout);
 
         // Advance clock by session timeout + 1.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(sessionTimeout + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(sessionTimeout + 1);
 
         // The member is fenced from the group.
         assertEquals(1, timeouts.size());
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
         assertEquals(groupSessionTimeoutKey(groupId, memberId), timeout.key());
         assertRecordsEquals(
             List.of(
@@ -15476,11 +15476,11 @@ public class GroupMetadataManagerTest {
         context.assertJoinTimeout(groupId, memberId, rebalanceTimeout);
 
         // Advance clock by rebalance timeout + 1.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeout + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeout + 1);
 
         // The member is fenced from the group.
         assertEquals(1, timeouts.size());
-        ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+        ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
         assertEquals(consumerGroupJoinKey(groupId, memberId), timeout.key());
         assertRecordsEquals(
             List.of(
@@ -16075,12 +16075,12 @@ public class GroupMetadataManagerTest {
         );
 
         // The new classic member 1 has a heartbeat timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
             classicGroupHeartbeatKey(groupId, memberId1)
         );
         assertNotNull(heartbeatTimeout);
         // The new rebalance has a groupJoin timeout.
-        ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
+        ScheduledTimeout<CoordinatorRecord> groupJoinTimeout = 
context.timer.timeout(
             classicGroupJoinKey(groupId)
         );
         assertNotNull(groupJoinTimeout);
@@ -19340,11 +19340,11 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId, 45000);
 
         // Advance time past the session timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupSessionTimeoutKey(groupId, memberId),
                 new CoordinatorResult<>(
                     List.of(
@@ -19486,7 +19486,7 @@ public class GroupMetadataManagerTest {
 
         // Verify that there is a revocation timeout. Keep a reference
         // to the timeout for later.
-        ScheduledTimeout<Void, CoordinatorRecord> scheduledTimeout =
+        ScheduledTimeout<CoordinatorRecord> scheduledTimeout =
             context.assertRebalanceTimeout(groupId, memberId1, 12000);
 
         assertEquals(
@@ -19649,11 +19649,11 @@ public class GroupMetadataManagerTest {
         );
 
         // Advance time past the revocation timeout.
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = 
context.sleep(rebalanceTimeoutMs + 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupRebalanceTimeoutKey(groupId, memberId1),
                 new CoordinatorResult<>(
                     List.of(
@@ -23505,11 +23505,11 @@ public class GroupMetadataManagerTest {
 
         // Member 2 is fenced due to reaching the session timeout.
         context.assertSessionTimeout(groupId, memberId2, 45000);
-        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+        List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000 
+ 1);
 
         // Verify the expired timeout.
         assertEquals(
-            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+            List.of(new ExpiredTimeout<CoordinatorRecord>(
                 groupSessionTimeoutKey(groupId, memberId2),
                 new CoordinatorResult<>(
                     List.of(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index b52d3edb3fe..1bbfd86f96a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -198,7 +198,7 @@ public class GroupMetadataManagerTestContext {
         }
     }
 
-    public static void 
assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<Void, 
CoordinatorRecord>> timeouts) {
+    public static void 
assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>>
 timeouts) {
         assertTrue(timeouts.size() <= 1);
         timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, 
timeout.result()));
     }
@@ -458,7 +458,7 @@ public class GroupMetadataManagerTestContext {
 
     public static class Builder {
         private MockTime time = new MockTime(0, 0, 0);
-        private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = 
new MockCoordinatorTimer<>(time);
+        private final MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(time);
         private final MockCoordinatorExecutor<CoordinatorRecord> executor = 
new MockCoordinatorExecutor<>();
         private final LogContext logContext = new LogContext();
         private final SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
@@ -576,7 +576,7 @@ public class GroupMetadataManagerTestContext {
     }
 
     final MockTime time;
-    final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
+    final MockCoordinatorTimer<CoordinatorRecord> timer;
     final MockCoordinatorExecutor<CoordinatorRecord> executor;
     final SnapshotRegistry snapshotRegistry;
     final GroupCoordinatorMetricsShard metrics;
@@ -590,7 +590,7 @@ public class GroupMetadataManagerTestContext {
 
     public GroupMetadataManagerTestContext(
         MockTime time,
-        MockCoordinatorTimer<Void, CoordinatorRecord> timer,
+        MockCoordinatorTimer<CoordinatorRecord> timer,
         MockCoordinatorExecutor<CoordinatorRecord> executor,
         SnapshotRegistry snapshotRegistry,
         GroupCoordinatorMetricsShard metrics,
@@ -761,9 +761,9 @@ public class GroupMetadataManagerTestContext {
         return result;
     }
 
-    public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
sleep(long ms) {
+    public List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> 
sleep(long ms) {
         time.sleep(ms);
-        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = timer.poll();
+        List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts 
= timer.poll();
         timeouts.forEach(timeout -> {
             if (timeout.result().replayRecords()) {
                 timeout.result().records().forEach(this::replay);
@@ -787,7 +787,7 @@ public class GroupMetadataManagerTestContext {
         String memberId,
         long delayMs
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(groupSessionTimeoutKey(groupId, memberId));
         assertNotNull(timeout);
         assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -797,17 +797,17 @@ public class GroupMetadataManagerTestContext {
         String groupId,
         String memberId
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(groupSessionTimeoutKey(groupId, memberId));
         assertNull(timeout);
     }
 
-    public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> 
assertRebalanceTimeout(
+    public MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> 
assertRebalanceTimeout(
         String groupId,
         String memberId,
         long delayMs
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
         assertNotNull(timeout);
         assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -818,17 +818,17 @@ public class GroupMetadataManagerTestContext {
         String groupId,
         String memberId
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
         assertNull(timeout);
     }
 
-    public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> 
assertJoinTimeout(
+    public MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> 
assertJoinTimeout(
         String groupId,
         String memberId,
         long delayMs
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(consumerGroupJoinKey(groupId, memberId));
         assertNotNull(timeout);
         assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -839,17 +839,17 @@ public class GroupMetadataManagerTestContext {
         String groupId,
         String memberId
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(consumerGroupJoinKey(groupId, memberId));
         assertNull(timeout);
     }
 
-    public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> 
assertSyncTimeout(
+    public MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> 
assertSyncTimeout(
         String groupId,
         String memberId,
         long delayMs
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(consumerGroupSyncKey(groupId, memberId));
         assertNotNull(timeout);
         assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -860,7 +860,7 @@ public class GroupMetadataManagerTestContext {
         String groupId,
         String memberId
     ) {
-        MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout 
=
+        MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
             timer.timeout(consumerGroupSyncKey(groupId, memberId));
         assertNull(timeout);
     }
@@ -999,7 +999,7 @@ public class GroupMetadataManagerTestContext {
         );
 
         assertTrue(secondJoinResult.records.isEmpty());
-        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = sleep(classicGroupInitialRebalanceDelayMs);
+        List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts 
= sleep(classicGroupInitialRebalanceDelayMs);
         assertEquals(1, timeouts.size());
         assertTrue(secondJoinResult.joinFuture.isDone());
         assertEquals(Errors.NONE.code(), 
secondJoinResult.joinFuture.get().errorCode());
@@ -1312,7 +1312,7 @@ public class GroupMetadataManagerTestContext {
                                                  .map(member -> 
classicGroupHeartbeatKey(group.groupId(), 
member.memberId())).collect(Collectors.toSet());
 
         // Member should be removed as session expires.
-        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = sleep(timeoutMs);
+        List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts 
= sleep(timeoutMs);
         List<CoordinatorRecord> expectedRecords = 
List.of(newGroupMetadataRecord(
             group.groupId(),
             new GroupMetadataValue()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 18e119961c9..0ef8efa158e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -108,7 +108,7 @@ public class OffsetMetadataManagerTest {
     static class OffsetMetadataManagerTestContext {
         public static class Builder {
             private final MockTime time = new MockTime();
-            private final MockCoordinatorTimer<Void, CoordinatorRecord> timer 
= new MockCoordinatorTimer<>(time);
+            private final MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(time);
             private final MockCoordinatorExecutor<CoordinatorRecord> executor 
= new MockCoordinatorExecutor<>();
             private final LogContext logContext = new LogContext();
             private final SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
@@ -175,7 +175,7 @@ public class OffsetMetadataManagerTest {
         }
 
         final MockTime time;
-        final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
+        final MockCoordinatorTimer<CoordinatorRecord> timer;
         final SnapshotRegistry snapshotRegistry;
         final GroupCoordinatorMetricsShard metrics;
         final GroupMetadataManager groupMetadataManager;
@@ -186,7 +186,7 @@ public class OffsetMetadataManagerTest {
 
         OffsetMetadataManagerTestContext(
             MockTime time,
-            MockCoordinatorTimer<Void, CoordinatorRecord> timer,
+            MockCoordinatorTimer<CoordinatorRecord> timer,
             SnapshotRegistry snapshotRegistry,
             GroupCoordinatorMetricsShard metrics,
             GroupMetadataManager groupMetadataManager,
@@ -384,9 +384,9 @@ public class OffsetMetadataManagerTest {
             return response.topics();
         }
 
-        public List<MockCoordinatorTimer.ExpiredTimeout<Void, 
CoordinatorRecord>> sleep(long ms) {
+        public List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> 
sleep(long ms) {
             time.sleep(ms);
-            List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = timer.poll();
+            List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> 
timeouts = timer.poll();
             timeouts.forEach(timeout -> {
                 if (timeout.result().replayRecords()) {
                     timeout.result().records().forEach(this::replay);
@@ -996,7 +996,7 @@ public class OffsetMetadataManagerTest {
 
         // Advance time by half of the session timeout again. The timeout 
should
         // expire and the member is removed from the group.
-        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts =
+        List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts =
             context.sleep(5000 / 2);
         assertEquals(1, timeouts.size());
         assertFalse(group.hasMember(member.memberId()));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index f015441f86f..83eaa1f13b0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -1240,7 +1240,7 @@ public class StreamsGroupTest {
     @Test
     public void testCancelTimers() {
         StreamsGroup streamsGroup = createStreamsGroup("test-group");
-        CoordinatorTimer<Void, CoordinatorRecord> timer = 
mock(CoordinatorTimer.class);
+        CoordinatorTimer<CoordinatorRecord> timer = 
mock(CoordinatorTimer.class);
 
         streamsGroup.cancelTimers(timer);
 
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 6c54f5cff9b..970cf561498 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -123,7 +123,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         }
 
         @Override
-        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withTimer(CoordinatorTimer<CoordinatorRecord> timer) {
             // method is required due to interface
             return this;
         }

Reply via email to