This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 02ab80582c9 MINOR: Cleanups in EventAccumulator (#21329)
02ab80582c9 is described below
commit 02ab80582c9934e6259e50d5e127f89142b9bcd6
Author: David Jacot <[email protected]>
AuthorDate: Mon Jan 19 15:20:09 2026 +0100
MINOR: Cleanups in EventAccumulator (#21329)
This patch does a few cleanups in EventAccumulator:
- Extract common logic from addLast/addFirst into private add() helper
method using BiConsumer for the add operation
- Replace LinkedList with ArrayDeque for better performance
- Fix Javadoc formatting (double braces to single braces)
- Use var for local variable declarations
Reviewers: Sean Quah <[email protected]>, Lianet Magrans
<[email protected]>
---
.../common/runtime/EventAccumulator.java | 98 ++++++++++------------
1 file changed, 46 insertions(+), 52 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
index 985fb488344..20d2466f0bd 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/EventAccumulator.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.coordinator.common.runtime;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -30,6 +30,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
/**
* A concurrent event accumulator which groups events per key and ensures that
only one
@@ -38,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock;
* This class is threadsafe.
*
* @param <K> The type of the key of the event.
- * @param <T> The type of the event itself. It implements the {{@link Event}}
interface.
+ * @param <T> The type of the event itself. It implements the {@link Event}
interface.
*
* There are a few examples about how to use it in the unit tests.
*/
@@ -111,59 +112,25 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
}
/**
- * Adds an {{@link Event}} at the end of the queue.
+ * Adds an {@link Event} at the end of the queue.
*
- * @param event An {{@link Event}}.
+ * @param event An {@link Event}.
*/
public void addLast(T event) throws RejectedExecutionException {
- lock.lock();
- try {
- if (closed) throw new RejectedExecutionException("Can't accept an
event because the accumulator is closed.");
-
- K key = event.key();
- Deque<T> queue = queues.get(key);
- if (queue == null) {
- queue = new LinkedList<>();
- queues.put(key, queue);
- if (!inflightKeys.contains(key)) {
- addAvailableKey(key);
- }
- }
- queue.addLast(event);
- size++;
- } finally {
- lock.unlock();
- }
+ add(event, Deque::addLast);
}
/**
- * Adds an {{@link Event}} at the front of the queue.
+ * Adds an {@link Event} at the front of the queue.
*
- * @param event An {{@link Event}}.
+ * @param event An {@link Event}.
*/
public void addFirst(T event) throws RejectedExecutionException {
- lock.lock();
- try {
- if (closed) throw new RejectedExecutionException("Can't accept an
event because the accumulator is closed.");
-
- K key = event.key();
- Deque<T> queue = queues.get(key);
- if (queue == null) {
- queue = new LinkedList<>();
- queues.put(key, queue);
- if (!inflightKeys.contains(key)) {
- addAvailableKey(key);
- }
- }
- queue.addFirst(event);
- size++;
- } finally {
- lock.unlock();
- }
+ add(event, Deque::addFirst);
}
/**
- * Immediately returns the next {{@link Event}} available or null
+ * Immediately returns the next {@link Event} available or null
* if the accumulator is empty.
*
* @return The next event available or null.
@@ -173,7 +140,7 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
}
/**
- * Returns the next {{@link Event}} available. This method blocks for the
provided
+ * Returns the next {@link Event} available. This method blocks for the
provided
* time and returns null if no event is available.
*
* @param timeout The timeout.
@@ -183,8 +150,8 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
- K key = randomKey();
- long nanos = unit.toNanos(timeout);
+ var key = randomKey();
+ var nanos = unit.toNanos(timeout);
while (key == null && !closed && nanos > 0) {
try {
nanos = condition.awaitNanos(nanos);
@@ -196,8 +163,8 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
if (key == null) return null;
- Deque<T> queue = queues.get(key);
- T event = queue.poll();
+ var queue = queues.get(key);
+ var event = queue.poll();
if (queue.isEmpty()) queues.remove(key);
inflightKeys.add(key);
@@ -218,7 +185,7 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
public void done(T event) {
lock.lock();
try {
- K key = event.key();
+ var key = event.key();
inflightKeys.remove(key);
if (queues.containsKey(key)) {
addAvailableKey(key);
@@ -254,6 +221,33 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
}
}
+ /**
+ * Adds an {@link Event} to the queue using the provided add operation.
+ *
+ * @param event The {@link Event} to add.
+ * @param addOperation The operation to use for adding (e.g., addFirst or
addLast).
+ */
+ private void add(T event, BiConsumer<Deque<T>, T> addOperation) throws
RejectedExecutionException {
+ lock.lock();
+ try {
+ if (closed) throw new RejectedExecutionException("Can't accept an
event because the accumulator is closed.");
+
+ var key = event.key();
+ var queue = queues.get(key);
+ if (queue == null) {
+ queue = new ArrayDeque<>();
+ queues.put(key, queue);
+ if (!inflightKeys.contains(key)) {
+ addAvailableKey(key);
+ }
+ }
+ addOperation.accept(queue, event);
+ size++;
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* Adds the key to the available keys set.
*
@@ -273,9 +267,9 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
private K randomKey() {
if (availableKeys.isEmpty()) return null;
- int lastIndex = availableKeys.size() - 1;
- int randomIndex = random.nextInt(availableKeys.size());
- K randomKey = availableKeys.get(randomIndex);
+ var lastIndex = availableKeys.size() - 1;
+ var randomIndex = random.nextInt(availableKeys.size());
+ var randomKey = availableKeys.get(randomIndex);
Collections.swap(availableKeys, randomIndex, lastIndex);
availableKeys.remove(lastIndex);
return randomKey;