This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 52289c92be4 MINOR: Optimize EventAccumulator (#15430)
52289c92be4 is described below
commit 52289c92be45ba3758d07376d9c64ddadbecb544
Author: David Jacot <[email protected]>
AuthorDate: Wed Feb 28 05:38:02 2024 -0800
MINOR: Optimize EventAccumulator (#15430)
`poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or
`0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos`
usage.
Reviewers: Jeff Kim <[email protected]>, Justine Olshan
<[email protected]>
---
.../group/runtime/EventAccumulator.java | 37 ++++++++-----
.../group/runtime/MultiThreadedEventProcessor.java | 9 ++--
.../group/runtime/EventAccumulatorTest.java | 30 +++++------
.../runtime/MultiThreadedEventProcessorTest.java | 61 +++++-----------------
4 files changed, 56 insertions(+), 81 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
index f46e8b8a8bf..16b61f8e991 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
@@ -27,7 +27,6 @@ import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -137,31 +136,43 @@ public class EventAccumulator<K, T extends
EventAccumulator.Event<K>> implements
}
/**
- * Returns the next {{@link Event}} available. This method block
indefinitely until
- * one event is ready or the accumulator is closed.
+ * Returns the next {{@link Event}} available or null if no event is
+ * available.
*
- * @return The next event.
+ * @return The next event available or null.
*/
public T poll() {
- return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+ lock.lock();
+ try {
+ K key = randomKey();
+ if (key == null) return null;
+
+ Queue<T> queue = queues.get(key);
+ T event = queue.poll();
+
+ if (queue.isEmpty()) queues.remove(key);
+ inflightKeys.add(key);
+ size--;
+
+ return event;
+ } finally {
+ lock.unlock();
+ }
}
/**
- * Returns the next {{@link Event}} available. This method blocks for the
provided
- * time and returns null of not event is available.
+ * Returns the next {{@link Event}} available. This method blocks until an
+ * event is available or accumulator is closed.
*
- * @param timeout The timeout.
- * @param unit The timeout unit.
* @return The next event available or null.
*/
- public T poll(long timeout, TimeUnit unit) {
+ public T take() {
lock.lock();
try {
K key = randomKey();
- long nanos = unit.toNanos(timeout);
- while (key == null && !closed && nanos > 0) {
+ while (key == null && !closed) {
try {
- nanos = condition.awaitNanos(nanos);
+ condition.await();
} catch (InterruptedException e) {
// Ignore.
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index e4adc18e957..0e3d563861c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -128,7 +127,7 @@ public class MultiThreadedEventProcessor implements
CoordinatorEventProcessor {
private void handleEvents() {
while (!shuttingDown) {
recordPollStartTime(time.milliseconds());
- CoordinatorEvent event = accumulator.poll();
+ CoordinatorEvent event = accumulator.take();
recordPollEndTime(time.milliseconds());
if (event != null) {
try {
@@ -148,8 +147,8 @@ public class MultiThreadedEventProcessor implements
CoordinatorEventProcessor {
}
private void drainEvents() {
- CoordinatorEvent event = accumulator.poll(0,
TimeUnit.MILLISECONDS);
- while (event != null) {
+ CoordinatorEvent event;
+ while ((event = accumulator.poll()) != null) {
try {
log.debug("Draining event: {}.", event);
metrics.recordEventQueueTime(time.milliseconds() -
event.createdTimeMs());
@@ -159,8 +158,6 @@ public class MultiThreadedEventProcessor implements
CoordinatorEventProcessor {
} finally {
accumulator.done(event);
}
-
- event = accumulator.poll(0, TimeUnit.MILLISECONDS);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
index 147cf08121c..e077fb5e022 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
@@ -78,7 +78,7 @@ public class EventAccumulatorTest {
EventAccumulator<Integer, MockEvent> accumulator = new
EventAccumulator<>();
assertEquals(0, accumulator.size());
- assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+ assertNull(accumulator.poll());
List<MockEvent> events = Arrays.asList(
new MockEvent(1, 0),
@@ -97,14 +97,14 @@ public class EventAccumulatorTest {
Set<MockEvent> polledEvents = new HashSet<>();
for (int i = 0; i < events.size(); i++) {
- MockEvent event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+ MockEvent event = accumulator.poll();
assertNotNull(event);
polledEvents.add(event);
assertEquals(events.size() - 1 - i, accumulator.size());
accumulator.done(event);
}
- assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+ assertNull(accumulator.poll());
assertEquals(new HashSet<>(events), polledEvents);
assertEquals(0, accumulator.size());
@@ -126,27 +126,27 @@ public class EventAccumulatorTest {
MockEvent event = null;
// Poll event0.
- event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+ event = accumulator.poll();
assertEquals(event0, event);
// Poll returns null because key is inflight.
- assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+ assertNull(accumulator.poll());
accumulator.done(event);
// Poll event1.
- event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+ event = accumulator.poll();
assertEquals(event1, event);
// Poll returns null because key is inflight.
- assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+ assertNull(accumulator.poll());
accumulator.done(event);
// Poll event2.
- event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+ event = accumulator.poll();
assertEquals(event2, event);
// Poll returns null because key is inflight.
- assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+ assertNull(accumulator.poll());
accumulator.done(event);
accumulator.close();
@@ -160,9 +160,9 @@ public class EventAccumulatorTest {
MockEvent event1 = new MockEvent(1, 1);
MockEvent event2 = new MockEvent(1, 2);
- CompletableFuture<MockEvent> future0 =
CompletableFuture.supplyAsync(accumulator::poll);
- CompletableFuture<MockEvent> future1 =
CompletableFuture.supplyAsync(accumulator::poll);
- CompletableFuture<MockEvent> future2 =
CompletableFuture.supplyAsync(accumulator::poll);
+ CompletableFuture<MockEvent> future0 =
CompletableFuture.supplyAsync(accumulator::take);
+ CompletableFuture<MockEvent> future1 =
CompletableFuture.supplyAsync(accumulator::take);
+ CompletableFuture<MockEvent> future2 =
CompletableFuture.supplyAsync(accumulator::take);
List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0,
future1, future2);
assertFalse(future0.isDone());
@@ -215,9 +215,9 @@ public class EventAccumulatorTest {
public void testCloseUnblockWaitingThreads() throws ExecutionException,
InterruptedException, TimeoutException {
EventAccumulator<Integer, MockEvent> accumulator = new
EventAccumulator<>();
- CompletableFuture<MockEvent> future0 =
CompletableFuture.supplyAsync(accumulator::poll);
- CompletableFuture<MockEvent> future1 =
CompletableFuture.supplyAsync(accumulator::poll);
- CompletableFuture<MockEvent> future2 =
CompletableFuture.supplyAsync(accumulator::poll);
+ CompletableFuture<MockEvent> future0 =
CompletableFuture.supplyAsync(accumulator::take);
+ CompletableFuture<MockEvent> future1 =
CompletableFuture.supplyAsync(accumulator::take);
+ CompletableFuture<MockEvent> future2 =
CompletableFuture.supplyAsync(accumulator::take);
assertFalse(future0.isDone());
assertFalse(future1.isDone());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 2714188f65e..3708141827c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -27,15 +27,12 @@ import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -53,53 +50,19 @@ import static org.mockito.Mockito.verify;
@Timeout(value = 60)
public class MultiThreadedEventProcessorTest {
- private static class MockEventAccumulator<T> extends
EventAccumulator<TopicPartition, CoordinatorEvent> {
+ private static class DelayEventAccumulator extends
EventAccumulator<TopicPartition, CoordinatorEvent> {
private final Time time;
- private final Queue<CoordinatorEvent> events;
- private final long timeToPollMs;
- private final AtomicBoolean isClosed;
+ private final long takeDelayMs;
- public MockEventAccumulator(Time time, long timeToPollMs) {
+ public DelayEventAccumulator(Time time, long takeDelayMs) {
this.time = time;
- this.events = new LinkedList<>();
- this.timeToPollMs = timeToPollMs;
- this.isClosed = new AtomicBoolean(false);
+ this.takeDelayMs = takeDelayMs;
}
@Override
- public CoordinatorEvent poll() {
- synchronized (events) {
- while (events.isEmpty() && !isClosed.get()) {
- try {
- events.wait();
- } catch (Exception ignored) {
-
- }
- }
- time.sleep(timeToPollMs);
- return events.poll();
- }
- }
-
- @Override
- public CoordinatorEvent poll(long timeout, TimeUnit unit) {
- return null;
- }
-
- @Override
- public void add(CoordinatorEvent event) throws
RejectedExecutionException {
- synchronized (events) {
- events.add(event);
- events.notifyAll();
- }
- }
-
- @Override
- public void close() {
- isClosed.set(true);
- synchronized (events) {
- events.notifyAll();
- }
+ public CoordinatorEvent take() {
+ time.sleep(takeDelayMs);
+ return super.take();
}
}
@@ -353,7 +316,11 @@ public class MultiThreadedEventProcessorTest {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
// Special event which blocks until the latch is released.
- FutureEvent<Integer> blockingEvent = new FutureEvent<>(new
TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true);
+ FutureEvent<Integer> blockingEvent = new FutureEvent<>(
+ new TopicPartition("foo", 0),
+ numEventsExecuted::incrementAndGet,
+ true
+ );
List<FutureEvent<Integer>> events = Arrays.asList(
new FutureEvent<>(new TopicPartition("foo", 0),
numEventsExecuted::incrementAndGet),
@@ -428,7 +395,7 @@ public class MultiThreadedEventProcessorTest {
1, // Use a single thread to block event in the processor.
mockTime,
mockRuntimeMetrics,
- new MockEventAccumulator<>(mockTime, 500L)
+ new DelayEventAccumulator(mockTime, 500L)
)) {
// Enqueue the blocking event.
eventProcessor.enqueue(blockingEvent);
@@ -501,7 +468,7 @@ public class MultiThreadedEventProcessorTest {
2,
Time.SYSTEM,
mockRuntimeMetrics,
- new MockEventAccumulator<>(Time.SYSTEM, 100L)
+ new DelayEventAccumulator(Time.SYSTEM, 100L)
)) {
List<Double> recordedRatios = new ArrayList<>();
AtomicInteger numEventsExecuted = new AtomicInteger(0);