This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be17df6fdac KAFKA-16374; High watermark updates should have a higher 
priority (#15534)
be17df6fdac is described below

commit be17df6fdac5cf269d8f80b49f209fb20de70eed
Author: David Jacot <[email protected]>
AuthorDate: Mon Mar 25 09:20:10 2024 +0100

    KAFKA-16374; High watermark updates should have a higher priority (#15534)
    
    When the group coordinator is under heavy load, the current mechanism to 
release pending events based on updated high watermark, which consist in 
pushing an event at the end of the queue, is bad because pending events pay the 
cost of the queue twice. A first time for the handling of the first event and a 
second time for the handling of the hwm update. This patch changes this logic 
to push the hwm update event to the front of the queue in order to release 
pending events as soon as as p [...]
    
    Reviewers: Jeff Kim <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../group/runtime/CoordinatorEventProcessor.java   |  12 ++-
 .../group/runtime/CoordinatorRuntime.java          | 120 ++++++++++++++-------
 .../group/runtime/EventAccumulator.java            |  42 ++++++--
 .../group/runtime/MultiThreadedEventProcessor.java |  17 ++-
 .../group/runtime/CoordinatorRuntimeTest.java      |  99 +++++++++++++++--
 .../group/runtime/EventAccumulatorTest.java        |  44 ++++++--
 .../runtime/MultiThreadedEventProcessorTest.java   |  18 ++--
 7 files changed, 273 insertions(+), 79 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java
index 0195880c4a5..f2463a7cd75 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java
@@ -24,10 +24,18 @@ import java.util.concurrent.RejectedExecutionException;
 public interface CoordinatorEventProcessor extends AutoCloseable {
 
     /**
-     * Enqueues a new {{@link CoordinatorEvent}}.
+     * Enqueues a new {{@link CoordinatorEvent}} at the end of the processor.
      *
      * @param event The event.
      * @throws RejectedExecutionException If the event processor is closed.
      */
-    void enqueue(CoordinatorEvent event) throws RejectedExecutionException;
+    void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException;
+
+    /**
+     * Enqueues a new {{@link CoordinatorEvent}} at the front of the processor.
+     *
+     * @param event The event.
+     * @throws RejectedExecutionException If the event processor is closed.
+     */
+    void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException;
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index fd348e01ece..2f52f1d11f9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -356,7 +357,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
 
                     log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
                     try {
-                        enqueue(event);
+                        enqueueLast(event);
                     } catch (NotCoordinatorException ex) {
                         log.info("Failed to enqueue write event {} for timer 
{} because the runtime is closed. Ignoring it.",
                             event.name, key);
@@ -438,6 +439,12 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         SnapshottableCoordinator<S, U> coordinator;
 
+        /**
+         * The high watermark listener registered to all the partitions
+         * backing the coordinators.
+         */
+        HighWatermarkListener highWatermarklistener;
+
         /**
          * Constructor.
          *
@@ -495,6 +502,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
 
                 case ACTIVE:
                     state = CoordinatorState.ACTIVE;
+                    highWatermarklistener = new HighWatermarkListener();
                     partitionWriter.registerListener(tp, 
highWatermarklistener);
                     coordinator.onLoaded(metadataImage);
                     break;
@@ -520,7 +528,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          * Unloads the coordinator.
          */
         private void unload() {
-            partitionWriter.deregisterListener(tp, highWatermarklistener);
+            if (highWatermarklistener != null) {
+                partitionWriter.deregisterListener(tp, highWatermarklistener);
+                highWatermarklistener = null;
+            }
             timer.cancelAll();
             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
             if (coordinator != null) {
@@ -1179,6 +1190,23 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      * backing the coordinator are updated.
      */
     class HighWatermarkListener implements PartitionWriter.Listener {
+
+        private static final long NO_OFFSET = -1L;
+
+        /**
+         * The atomic long is used to store the last and unprocessed high 
watermark
+         * received from the partition. The atomic value is replaced by -1L 
when
+         * the high watermark is taken to update the context.
+         */
+        private final AtomicLong lastHighWatermark = new AtomicLong(NO_OFFSET);
+
+        /**
+         * @return The last high watermark received or NO_OFFSET is none is 
pending.
+         */
+        public long lastHighWatermark() {
+            return lastHighWatermark.get();
+        }
+
         /**
          * Updates the high watermark of the corresponding coordinator.
          *
@@ -1191,30 +1219,37 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             long offset
         ) {
             log.debug("High watermark of {} incremented to {}.", tp, offset);
-            scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-                CoordinatorContext context = coordinators.get(tp);
-                if (context != null) {
-                    context.lock.lock();
-                    try {
-                        if (context.state == CoordinatorState.ACTIVE) {
-                            // The updated high watermark can be applied to 
the coordinator only if the coordinator
-                            // exists and is in the active state.
-                            log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-                            
context.coordinator.updateLastCommittedOffset(offset);
-                            context.deferredEventQueue.completeUpTo(offset);
-                            coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-                        } else {
-                            log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-                                tp, offset);
+            if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+                // An event to apply the new high watermark is pushed to the 
front of the
+                // queue only if the previous value was -1L. If it was not, it 
means that
+                // there is already an event waiting to process the last value.
+                enqueueFirst(new 
CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> {
+                    long newHighWatermark = 
lastHighWatermark.getAndSet(NO_OFFSET);
+
+                    CoordinatorContext context = coordinators.get(tp);
+                    if (context != null) {
+                        context.lock.lock();
+                        try {
+                            if (context.state == CoordinatorState.ACTIVE) {
+                                // The updated high watermark can be applied 
to the coordinator only if the coordinator
+                                // exists and is in the active state.
+                                log.debug("Updating high watermark of {} to 
{}.", tp, newHighWatermark);
+                                
context.coordinator.updateLastCommittedOffset(newHighWatermark);
+                                
context.deferredEventQueue.completeUpTo(newHighWatermark);
+                                
coordinatorMetrics.onUpdateLastCommittedOffset(tp, newHighWatermark);
+                            } else {
+                                log.debug("Ignored high watermark updated for 
{} to {} because the coordinator is not active.",
+                                    tp, newHighWatermark);
+                            }
+                        } finally {
+                            context.lock.unlock();
                         }
-                    } finally {
-                        context.lock.unlock();
+                    } else {
+                        log.debug("Ignored high watermark updated for {} to {} 
because the coordinator does not exist.",
+                            tp, newHighWatermark);
                     }
-                } else {
-                    log.debug("Ignored high watermark updated for {} to {} 
because the coordinator does not exist.",
-                        tp, offset);
-                }
-            });
+                }));
+            }
         }
     }
 
@@ -1263,12 +1298,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      */
     private final PartitionWriter<U> partitionWriter;
 
-    /**
-     * The high watermark listener registered to all the partitions
-     * backing the coordinators.
-     */
-    private final HighWatermarkListener highWatermarklistener;
-
     /**
      * The coordinator loaded used by the runtime.
      */
@@ -1335,7 +1364,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         this.coordinators = new ConcurrentHashMap<>();
         this.processor = processor;
         this.partitionWriter = partitionWriter;
-        this.highWatermarklistener = new HighWatermarkListener();
         this.loader = loader;
         this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
         this.runtimeMetrics = runtimeMetrics;
@@ -1353,14 +1381,28 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
     }
 
     /**
-     * Enqueues a new event.
+     * Enqueues a new event at the end of the processing queue.
+     *
+     * @param event The event.
+     * @throws NotCoordinatorException If the event processor is closed.
+     */
+    private void enqueueLast(CoordinatorEvent event) {
+        try {
+            processor.enqueueLast(event);
+        } catch (RejectedExecutionException ex) {
+            throw new NotCoordinatorException("Can't accept an event because 
the processor is closed", ex);
+        }
+    }
+
+    /**
+     * Enqueues a new event at the front of the processing queue.
      *
      * @param event The event.
      * @throws NotCoordinatorException If the event processor is closed.
      */
-    private void enqueue(CoordinatorEvent event) {
+    private void enqueueFirst(CoordinatorEvent event) {
         try {
-            processor.enqueue(event);
+            processor.enqueueFirst(event);
         } catch (RejectedExecutionException ex) {
             throw new NotCoordinatorException("Can't accept an event because 
the processor is closed", ex);
         }
@@ -1442,7 +1484,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         throwIfNotRunning();
         log.debug("Scheduled execution of write operation {}.", name);
         CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(name, tp, 
timeout, op);
-        enqueue(event);
+        enqueueLast(event);
         return event.future;
     }
 
@@ -1518,7 +1560,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 timeout,
                 op
             );
-            enqueue(event);
+            enqueueLast(event);
             return event.future;
         });
     }
@@ -1557,7 +1599,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             result,
             timeout
         );
-        enqueue(event);
+        enqueueLast(event);
         return event.future;
     }
 
@@ -1581,7 +1623,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         throwIfNotRunning();
         log.debug("Scheduled execution of read operation {}.", name);
         CoordinatorReadEvent<T> event = new CoordinatorReadEvent<>(name, tp, 
op);
-        enqueue(event);
+        enqueueLast(event);
         return event.future;
     }
 
@@ -1622,7 +1664,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         Runnable op
     ) {
         log.debug("Scheduled execution of internal operation {}.", name);
-        enqueue(new CoordinatorInternalEvent(name, tp, op));
+        enqueueLast(new CoordinatorInternalEvent(name, tp, op));
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
index 16b61f8e991..2c22232c47a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
@@ -18,12 +18,12 @@ package org.apache.kafka.coordinator.group.runtime;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
@@ -60,7 +60,7 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     /**
      * The map of queues keyed by K.
      */
-    private final Map<K, Queue<T>> queues;
+    private final Map<K, Deque<T>> queues;
 
     /**
      * The list of available keys. Keys in this list can
@@ -110,17 +110,17 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     }
 
     /**
-     * Adds an {{@link Event}} to the queue.
+     * Adds an {{@link Event}} at the end of the queue.
      *
      * @param event An {{@link Event}}.
      */
-    public void add(T event) throws RejectedExecutionException {
+    public void addLast(T event) throws RejectedExecutionException {
         lock.lock();
         try {
             if (closed) throw new RejectedExecutionException("Can't accept an 
event because the accumulator is closed.");
 
             K key = event.key();
-            Queue<T> queue = queues.get(key);
+            Deque<T> queue = queues.get(key);
             if (queue == null) {
                 queue = new LinkedList<>();
                 queues.put(key, queue);
@@ -128,7 +128,33 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
                     addAvailableKey(key);
                 }
             }
-            queue.add(event);
+            queue.addLast(event);
+            size++;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Adds an {{@link Event}} at the front of the queue.
+     *
+     * @param event An {{@link Event}}.
+     */
+    public void addFirst(T event) throws RejectedExecutionException {
+        lock.lock();
+        try {
+            if (closed) throw new RejectedExecutionException("Can't accept an 
event because the accumulator is closed.");
+
+            K key = event.key();
+            Deque<T> queue = queues.get(key);
+            if (queue == null) {
+                queue = new LinkedList<>();
+                queues.put(key, queue);
+                if (!inflightKeys.contains(key)) {
+                    addAvailableKey(key);
+                }
+            }
+            queue.addFirst(event);
             size++;
         } finally {
             lock.unlock();
@@ -147,7 +173,7 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
             K key = randomKey();
             if (key == null) return null;
 
-            Queue<T> queue = queues.get(key);
+            Deque<T> queue = queues.get(key);
             T event = queue.poll();
 
             if (queue.isEmpty()) queues.remove(key);
@@ -181,7 +207,7 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
 
             if (key == null) return null;
 
-            Queue<T> queue = queues.get(key);
+            Deque<T> queue = queues.get(key);
             T event = queue.poll();
 
             if (queue.isEmpty()) queues.remove(key);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index 0e3d563861c..0aca6a3b79a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -198,14 +198,25 @@ public class MultiThreadedEventProcessor implements 
CoordinatorEventProcessor {
     }
 
     /**
-     * Enqueues a new {{@link CoordinatorEvent}}.
+     * Enqueues a new {{@link CoordinatorEvent}} at the end of the processor.
      *
      * @param event The event.
      * @throws RejectedExecutionException If the event processor is closed.
      */
     @Override
-    public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
-        accumulator.add(event);
+    public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
+        accumulator.addLast(event);
+    }
+
+    /**
+     * Enqueues a new {{@link CoordinatorEvent}} at the front of the processor.
+     *
+     * @param event The event.
+     * @throws RejectedExecutionException If the event processor is closed.
+     */
+    @Override
+    public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
+        accumulator.addFirst(event);
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 622d4335b9a..c8a1dc337cb 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -47,12 +47,12 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalInt;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -96,7 +96,16 @@ public class CoordinatorRuntimeTest {
      */
     private static class DirectEventProcessor implements 
CoordinatorEventProcessor {
         @Override
-        public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
+        public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
+            try {
+                event.run();
+            } catch (Throwable ex) {
+                event.complete(ex);
+            }
+        }
+
+        @Override
+        public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
             try {
                 event.run();
             } catch (Throwable ex) {
@@ -113,11 +122,16 @@ public class CoordinatorRuntimeTest {
      * when poll() is called.
      */
     private static class ManualEventProcessor implements 
CoordinatorEventProcessor {
-        private Queue<CoordinatorEvent> queue = new LinkedList<>();
+        private Deque<CoordinatorEvent> queue = new LinkedList<>();
+
+        @Override
+        public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
+            queue.addLast(event);
+        }
 
         @Override
-        public void enqueue(CoordinatorEvent event) throws 
RejectedExecutionException {
-            queue.add(event);
+        public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
+            queue.addFirst(event);
         }
 
         public boolean poll() {
@@ -507,12 +521,6 @@ public class CoordinatorRuntimeTest {
 
         // Verify that onUnloaded is called.
         verify(coordinator, times(1)).onUnloaded();
-
-        // Verify that the listener is deregistered.
-        verify(writer, times(1)).deregisterListener(
-            eq(TP),
-            any(PartitionWriter.Listener.class)
-        );
     }
 
     @Test
@@ -2603,6 +2611,75 @@ public class CoordinatorRuntimeTest {
         assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
     }
 
+    @Test
+    public void testHighWatermarkUpdate() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ManualEventProcessor processor = new ManualEventProcessor();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1")
+        );
+        processor.poll();
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2")
+        );
+        processor.poll();
+
+        // Records have been written to the log.
+        assertEquals(Arrays.asList(
+            InMemoryPartitionWriter.LogEntry.value("record1"),
+            InMemoryPartitionWriter.LogEntry.value("record2")
+        ), writer.entries(TP));
+
+        // There is no pending high watermark.
+        assertEquals(-1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Commit the first record.
+        writer.commit(TP, 1);
+
+        // We should have one pending event and the pending high watermark 
should be set.
+        assertEquals(1, processor.size());
+        assertEquals(1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Commit the second record.
+        writer.commit(TP, 2);
+
+        // We should still have one pending event and the pending high 
watermark should be updated.
+        assertEquals(1, processor.size());
+        assertEquals(2, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Poll once to process the high watermark update and complete the 
writes.
+        processor.poll();
+
+        assertEquals(-1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+        assertEquals(2, 
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+    }
+
     private static <S extends CoordinatorShard<U>, U> 
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
         CoordinatorRuntime<S, U> runtime,
         TopicPartition tp
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
index e077fb5e022..faac0f46f6a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.coordinator.group.runtime;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -92,7 +94,7 @@ public class EventAccumulatorTest {
             new MockEvent(3, 2)
         );
 
-        events.forEach(accumulator::add);
+        events.forEach(accumulator::addLast);
         assertEquals(9, accumulator.size());
 
         Set<MockEvent> polledEvents = new HashSet<>();
@@ -111,6 +113,34 @@ public class EventAccumulatorTest {
         accumulator.close();
     }
 
+    @Test
+    public void testAddFirst() {
+        EventAccumulator<Integer, MockEvent> accumulator = new 
EventAccumulator<>();
+
+        List<MockEvent> events = Arrays.asList(
+            new MockEvent(1, 0),
+            new MockEvent(1, 1),
+            new MockEvent(1, 2)
+        );
+
+        events.forEach(accumulator::addFirst);
+        assertEquals(3, accumulator.size());
+
+        List<MockEvent> polledEvents = new ArrayList<>(3);
+        for (int i = 0; i < events.size(); i++) {
+            MockEvent event = accumulator.poll();
+            assertNotNull(event);
+            polledEvents.add(event);
+            assertEquals(events.size() - 1 - i, accumulator.size());
+            accumulator.done(event);
+        }
+
+        Collections.reverse(events);
+        assertEquals(events, polledEvents);
+
+        accumulator.close();
+    }
+
     @Test
     public void testKeyConcurrentAndOrderingGuarantees() {
         EventAccumulator<Integer, MockEvent> accumulator = new 
EventAccumulator<>();
@@ -118,9 +148,9 @@ public class EventAccumulatorTest {
         MockEvent event0 = new MockEvent(1, 0);
         MockEvent event1 = new MockEvent(1, 1);
         MockEvent event2 = new MockEvent(1, 2);
-        accumulator.add(event0);
-        accumulator.add(event1);
-        accumulator.add(event2);
+        accumulator.addLast(event0);
+        accumulator.addLast(event1);
+        accumulator.addLast(event2);
         assertEquals(3, accumulator.size());
 
         MockEvent event = null;
@@ -169,9 +199,9 @@ public class EventAccumulatorTest {
         assertFalse(future1.isDone());
         assertFalse(future2.isDone());
 
-        accumulator.add(event0);
-        accumulator.add(event1);
-        accumulator.add(event2);
+        accumulator.addLast(event0);
+        accumulator.addLast(event1);
+        accumulator.addLast(event2);
 
         // One future should be completed with event0.
         assertEquals(event0, CompletableFuture
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 3708141827c..f01fc883105 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -186,7 +186,7 @@ public class MultiThreadedEventProcessorTest {
                 new FutureEvent<>(new TopicPartition("foo", 2), 
numEventsExecuted::incrementAndGet)
             );
 
-            events.forEach(eventProcessor::enqueue);
+            events.forEach(eventProcessor::enqueueLast);
 
             CompletableFuture.allOf(events
                 .stream()
@@ -223,7 +223,7 @@ public class MultiThreadedEventProcessorTest {
                 new FutureEvent<>(new TopicPartition("foo", 1), 
numEventsExecuted::incrementAndGet, true)  // Event 5
             );
 
-            events.forEach(eventProcessor::enqueue);
+            events.forEach(eventProcessor::enqueueLast);
 
             // Events 0 and 1 are executed.
             assertTrue(events.get(0).awaitExecution(5, TimeUnit.SECONDS));
@@ -301,7 +301,7 @@ public class MultiThreadedEventProcessorTest {
         eventProcessor.close();
 
         assertThrows(RejectedExecutionException.class,
-            () -> eventProcessor.enqueue(new FutureEvent<>(new 
TopicPartition("foo", 0), () -> 0)));
+            () -> eventProcessor.enqueueLast(new FutureEvent<>(new 
TopicPartition("foo", 0), () -> 0)));
     }
 
     @Test
@@ -332,14 +332,14 @@ public class MultiThreadedEventProcessorTest {
             );
 
             // Enqueue the blocking event.
-            eventProcessor.enqueue(blockingEvent);
+            eventProcessor.enqueueLast(blockingEvent);
 
             // Ensure that the blocking event is executed.
             waitForCondition(() -> numEventsExecuted.get() > 0,
                 "Blocking event not executed.");
 
             // Enqueue the other events.
-            events.forEach(eventProcessor::enqueue);
+            events.forEach(eventProcessor::enqueueLast);
 
             // Events should not be completed.
             events.forEach(event -> assertFalse(event.future.isDone()));
@@ -349,7 +349,7 @@ public class MultiThreadedEventProcessorTest {
 
             // Enqueuing a new event is rejected.
             assertThrows(RejectedExecutionException.class,
-                () -> eventProcessor.enqueue(blockingEvent));
+                () -> eventProcessor.enqueueLast(blockingEvent));
 
             // Release the blocking event to unblock the thread.
             blockingEvent.release();
@@ -398,7 +398,7 @@ public class MultiThreadedEventProcessorTest {
             new DelayEventAccumulator(mockTime, 500L)
         )) {
             // Enqueue the blocking event.
-            eventProcessor.enqueue(blockingEvent);
+            eventProcessor.enqueueLast(blockingEvent);
 
             // Ensure that the blocking event is executed.
             waitForCondition(() -> numEventsExecuted.get() > 0,
@@ -414,7 +414,7 @@ public class MultiThreadedEventProcessorTest {
                 mockTime.milliseconds()
             );
 
-            eventProcessor.enqueue(otherEvent);
+            eventProcessor.enqueueLast(otherEvent);
 
             // Pass the time.
             mockTime.sleep(3000L);
@@ -492,7 +492,7 @@ public class MultiThreadedEventProcessorTest {
                 new FutureEvent<>(new TopicPartition("foo", 2), 
numEventsExecuted::incrementAndGet)
             );
 
-            events.forEach(eventProcessor::enqueue);
+            events.forEach(eventProcessor::enqueueLast);
 
             CompletableFuture.allOf(events
                 .stream()

Reply via email to