This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02ab80582c9 MINOR: Cleanups in EventAccumulator (#21329)
02ab80582c9 is described below

commit 02ab80582c9934e6259e50d5e127f89142b9bcd6
Author: David Jacot <[email protected]>
AuthorDate: Mon Jan 19 15:20:09 2026 +0100

    MINOR: Cleanups in EventAccumulator (#21329)
    
    This patch does a few cleanups in EventAccumulator:
    - Extract common logic from addLast/addFirst into private add() helper
    method using BiConsumer for the add operation
    - Replace LinkedList with ArrayDeque for better performance
    - Fix Javadoc formatting (double braces to single braces)
    - Use var for local variable declarations
    
    Reviewers: Sean Quah <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../common/runtime/EventAccumulator.java           | 98 ++++++++++------------
 1 file changed, 46 insertions(+), 52 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
index 985fb488344..20d2466f0bd 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -30,6 +30,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
 
 /**
  * A concurrent event accumulator which groups events per key and ensures that 
only one
@@ -38,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * This class is threadsafe.
  *
  * @param <K> The type of the key of the event.
- * @param <T> The type of the event itself. It implements the {{@link Event}} 
interface.
+ * @param <T> The type of the event itself. It implements the {@link Event} 
interface.
  *
  * There are a few examples about how to use it in the unit tests.
  */
@@ -111,59 +112,25 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     }
 
     /**
-     * Adds an {{@link Event}} at the end of the queue.
+     * Adds an {@link Event} at the end of the queue.
      *
-     * @param event An {{@link Event}}.
+     * @param event An {@link Event}.
      */
     public void addLast(T event) throws RejectedExecutionException {
-        lock.lock();
-        try {
-            if (closed) throw new RejectedExecutionException("Can't accept an 
event because the accumulator is closed.");
-
-            K key = event.key();
-            Deque<T> queue = queues.get(key);
-            if (queue == null) {
-                queue = new LinkedList<>();
-                queues.put(key, queue);
-                if (!inflightKeys.contains(key)) {
-                    addAvailableKey(key);
-                }
-            }
-            queue.addLast(event);
-            size++;
-        } finally {
-            lock.unlock();
-        }
+        add(event, Deque::addLast);
     }
 
     /**
-     * Adds an {{@link Event}} at the front of the queue.
+     * Adds an {@link Event} at the front of the queue.
      *
-     * @param event An {{@link Event}}.
+     * @param event An {@link Event}.
      */
     public void addFirst(T event) throws RejectedExecutionException {
-        lock.lock();
-        try {
-            if (closed) throw new RejectedExecutionException("Can't accept an 
event because the accumulator is closed.");
-
-            K key = event.key();
-            Deque<T> queue = queues.get(key);
-            if (queue == null) {
-                queue = new LinkedList<>();
-                queues.put(key, queue);
-                if (!inflightKeys.contains(key)) {
-                    addAvailableKey(key);
-                }
-            }
-            queue.addFirst(event);
-            size++;
-        } finally {
-            lock.unlock();
-        }
+        add(event, Deque::addFirst);
     }
 
     /**
-     * Immediately returns the next {{@link Event}} available or null
+     * Immediately returns the next {@link Event} available or null
      * if the accumulator is empty.
      *
      * @return The next event available or null.
@@ -173,7 +140,7 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     }
 
     /**
-     * Returns the next {{@link Event}} available. This method blocks for the 
provided
+     * Returns the next {@link Event} available. This method blocks for the 
provided
      * time and returns null if no event is available.
      *
      * @param timeout   The timeout.
@@ -183,8 +150,8 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     public T poll(long timeout, TimeUnit unit) {
         lock.lock();
         try {
-            K key = randomKey();
-            long nanos = unit.toNanos(timeout);
+            var key = randomKey();
+            var nanos = unit.toNanos(timeout);
             while (key == null && !closed && nanos > 0) {
                 try {
                     nanos = condition.awaitNanos(nanos);
@@ -196,8 +163,8 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
 
             if (key == null) return null;
 
-            Deque<T> queue = queues.get(key);
-            T event = queue.poll();
+            var queue = queues.get(key);
+            var event = queue.poll();
 
             if (queue.isEmpty()) queues.remove(key);
             inflightKeys.add(key);
@@ -218,7 +185,7 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     public void done(T event) {
         lock.lock();
         try {
-            K key = event.key();
+            var key = event.key();
             inflightKeys.remove(key);
             if (queues.containsKey(key)) {
                 addAvailableKey(key);
@@ -254,6 +221,33 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
         }
     }
 
+    /**
+     * Adds an {@link Event} to the queue using the provided add operation.
+     *
+     * @param event         The {@link Event} to add.
+     * @param addOperation  The operation to use for adding (e.g., addFirst or 
addLast).
+     */
+    private void add(T event, BiConsumer<Deque<T>, T> addOperation) throws 
RejectedExecutionException {
+        lock.lock();
+        try {
+            if (closed) throw new RejectedExecutionException("Can't accept an 
event because the accumulator is closed.");
+
+            var key = event.key();
+            var queue = queues.get(key);
+            if (queue == null) {
+                queue = new ArrayDeque<>();
+                queues.put(key, queue);
+                if (!inflightKeys.contains(key)) {
+                    addAvailableKey(key);
+                }
+            }
+            addOperation.accept(queue, event);
+            size++;
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /**
      * Adds the key to the available keys set.
      *
@@ -273,9 +267,9 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     private K randomKey() {
         if (availableKeys.isEmpty()) return null;
 
-        int lastIndex = availableKeys.size() - 1;
-        int randomIndex = random.nextInt(availableKeys.size());
-        K randomKey = availableKeys.get(randomIndex);
+        var lastIndex = availableKeys.size() - 1;
+        var randomIndex = random.nextInt(availableKeys.size());
+        var randomKey = availableKeys.get(randomIndex);
         Collections.swap(availableKeys, randomIndex, lastIndex);
         availableKeys.remove(lastIndex);
         return randomKey;

Reply via email to