This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 52289c92be4 MINOR: Optimize EventAccumulator (#15430) 52289c92be4 is described below commit 52289c92be45ba3758d07376d9c64ddadbecb544 Author: David Jacot <dja...@confluent.io> AuthorDate: Wed Feb 28 05:38:02 2024 -0800 MINOR: Optimize EventAccumulator (#15430) `poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage. Reviewers: Jeff Kim <jeff....@confluent.io>, Justine Olshan <jols...@confluent.io> --- .../group/runtime/EventAccumulator.java | 37 ++++++++----- .../group/runtime/MultiThreadedEventProcessor.java | 9 ++-- .../group/runtime/EventAccumulatorTest.java | 30 +++++------ .../runtime/MultiThreadedEventProcessorTest.java | 61 +++++----------------- 4 files changed, 56 insertions(+), 81 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java index f46e8b8a8bf..16b61f8e991 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java @@ -27,7 +27,6 @@ import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -137,31 +136,43 @@ public class EventAccumulator<K, T extends EventAccumulator.Event<K>> implements } /** - * Returns the next {{@link Event}} available. This method block indefinitely until - * one event is ready or the accumulator is closed. + * Returns the next {{@link Event}} available or null if no event is + * available. * - * @return The next event. + * @return The next event available or null. */ public T poll() { - return poll(Long.MAX_VALUE, TimeUnit.SECONDS); + lock.lock(); + try { + K key = randomKey(); + if (key == null) return null; + + Queue<T> queue = queues.get(key); + T event = queue.poll(); + + if (queue.isEmpty()) queues.remove(key); + inflightKeys.add(key); + size--; + + return event; + } finally { + lock.unlock(); + } } /** - * Returns the next {{@link Event}} available. This method blocks for the provided - * time and returns null of not event is available. + * Returns the next {{@link Event}} available. This method blocks until an + * event is available or accumulator is closed. * - * @param timeout The timeout. - * @param unit The timeout unit. * @return The next event available or null. */ - public T poll(long timeout, TimeUnit unit) { + public T take() { lock.lock(); try { K key = randomKey(); - long nanos = unit.toNanos(timeout); - while (key == null && !closed && nanos > 0) { + while (key == null && !closed) { try { - nanos = condition.awaitNanos(nanos); + condition.await(); } catch (InterruptedException e) { // Ignore. } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java index e4adc18e957..0e3d563861c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java @@ -25,7 +25,6 @@ import org.slf4j.Logger; import java.util.List; import java.util.Objects; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -128,7 +127,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { private void handleEvents() { while (!shuttingDown) { recordPollStartTime(time.milliseconds()); - CoordinatorEvent event = accumulator.poll(); + CoordinatorEvent event = accumulator.take(); recordPollEndTime(time.milliseconds()); if (event != null) { try { @@ -148,8 +147,8 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { } private void drainEvents() { - CoordinatorEvent event = accumulator.poll(0, TimeUnit.MILLISECONDS); - while (event != null) { + CoordinatorEvent event; + while ((event = accumulator.poll()) != null) { try { log.debug("Draining event: {}.", event); metrics.recordEventQueueTime(time.milliseconds() - event.createdTimeMs()); @@ -159,8 +158,6 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { } finally { accumulator.done(event); } - - event = accumulator.poll(0, TimeUnit.MILLISECONDS); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java index 147cf08121c..e077fb5e022 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java @@ -78,7 +78,7 @@ public class EventAccumulatorTest { EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>(); assertEquals(0, accumulator.size()); - assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS)); + assertNull(accumulator.poll()); List<MockEvent> events = Arrays.asList( new MockEvent(1, 0), @@ -97,14 +97,14 @@ public class EventAccumulatorTest { Set<MockEvent> polledEvents = new HashSet<>(); for (int i = 0; i < events.size(); i++) { - MockEvent event = accumulator.poll(0, TimeUnit.MICROSECONDS); + MockEvent event = accumulator.poll(); assertNotNull(event); polledEvents.add(event); assertEquals(events.size() - 1 - i, accumulator.size()); accumulator.done(event); } - assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS)); + assertNull(accumulator.poll()); assertEquals(new HashSet<>(events), polledEvents); assertEquals(0, accumulator.size()); @@ -126,27 +126,27 @@ public class EventAccumulatorTest { MockEvent event = null; // Poll event0. - event = accumulator.poll(0, TimeUnit.MICROSECONDS); + event = accumulator.poll(); assertEquals(event0, event); // Poll returns null because key is inflight. - assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS)); + assertNull(accumulator.poll()); accumulator.done(event); // Poll event1. - event = accumulator.poll(0, TimeUnit.MICROSECONDS); + event = accumulator.poll(); assertEquals(event1, event); // Poll returns null because key is inflight. - assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS)); + assertNull(accumulator.poll()); accumulator.done(event); // Poll event2. - event = accumulator.poll(0, TimeUnit.MICROSECONDS); + event = accumulator.poll(); assertEquals(event2, event); // Poll returns null because key is inflight. - assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS)); + assertNull(accumulator.poll()); accumulator.done(event); accumulator.close(); @@ -160,9 +160,9 @@ public class EventAccumulatorTest { MockEvent event1 = new MockEvent(1, 1); MockEvent event2 = new MockEvent(1, 2); - CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::poll); - CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::poll); - CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::poll); + CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::take); + CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::take); + CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::take); List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0, future1, future2); assertFalse(future0.isDone()); @@ -215,9 +215,9 @@ public class EventAccumulatorTest { public void testCloseUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException { EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>(); - CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::poll); - CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::poll); - CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::poll); + CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::take); + CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::take); + CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::take); assertFalse(future0.isDone()); assertFalse(future1.isDone()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java index 2714188f65e..3708141827c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java @@ -27,15 +27,12 @@ import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -53,53 +50,19 @@ import static org.mockito.Mockito.verify; @Timeout(value = 60) public class MultiThreadedEventProcessorTest { - private static class MockEventAccumulator<T> extends EventAccumulator<TopicPartition, CoordinatorEvent> { + private static class DelayEventAccumulator extends EventAccumulator<TopicPartition, CoordinatorEvent> { private final Time time; - private final Queue<CoordinatorEvent> events; - private final long timeToPollMs; - private final AtomicBoolean isClosed; + private final long takeDelayMs; - public MockEventAccumulator(Time time, long timeToPollMs) { + public DelayEventAccumulator(Time time, long takeDelayMs) { this.time = time; - this.events = new LinkedList<>(); - this.timeToPollMs = timeToPollMs; - this.isClosed = new AtomicBoolean(false); + this.takeDelayMs = takeDelayMs; } @Override - public CoordinatorEvent poll() { - synchronized (events) { - while (events.isEmpty() && !isClosed.get()) { - try { - events.wait(); - } catch (Exception ignored) { - - } - } - time.sleep(timeToPollMs); - return events.poll(); - } - } - - @Override - public CoordinatorEvent poll(long timeout, TimeUnit unit) { - return null; - } - - @Override - public void add(CoordinatorEvent event) throws RejectedExecutionException { - synchronized (events) { - events.add(event); - events.notifyAll(); - } - } - - @Override - public void close() { - isClosed.set(true); - synchronized (events) { - events.notifyAll(); - } + public CoordinatorEvent take() { + time.sleep(takeDelayMs); + return super.take(); } } @@ -353,7 +316,11 @@ public class MultiThreadedEventProcessorTest { AtomicInteger numEventsExecuted = new AtomicInteger(0); // Special event which blocks until the latch is released. - FutureEvent<Integer> blockingEvent = new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true); + FutureEvent<Integer> blockingEvent = new FutureEvent<>( + new TopicPartition("foo", 0), + numEventsExecuted::incrementAndGet, + true + ); List<FutureEvent<Integer>> events = Arrays.asList( new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet), @@ -428,7 +395,7 @@ public class MultiThreadedEventProcessorTest { 1, // Use a single thread to block event in the processor. mockTime, mockRuntimeMetrics, - new MockEventAccumulator<>(mockTime, 500L) + new DelayEventAccumulator(mockTime, 500L) )) { // Enqueue the blocking event. eventProcessor.enqueue(blockingEvent); @@ -501,7 +468,7 @@ public class MultiThreadedEventProcessorTest { 2, Time.SYSTEM, mockRuntimeMetrics, - new MockEventAccumulator<>(Time.SYSTEM, 100L) + new DelayEventAccumulator(Time.SYSTEM, 100L) )) { List<Double> recordedRatios = new ArrayList<>(); AtomicInteger numEventsExecuted = new AtomicInteger(0);