This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 52289c92be4 MINOR: Optimize EventAccumulator (#15430)
52289c92be4 is described below

commit 52289c92be45ba3758d07376d9c64ddadbecb544
Author: David Jacot <dja...@confluent.io>
AuthorDate: Wed Feb 28 05:38:02 2024 -0800

    MINOR: Optimize EventAccumulator (#15430)
    
    `poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or 
`0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` 
usage.
    
    Reviewers: Jeff Kim <jeff....@confluent.io>, Justine Olshan 
<jols...@confluent.io>
---
 .../group/runtime/EventAccumulator.java            | 37 ++++++++-----
 .../group/runtime/MultiThreadedEventProcessor.java |  9 ++--
 .../group/runtime/EventAccumulatorTest.java        | 30 +++++------
 .../runtime/MultiThreadedEventProcessorTest.java   | 61 +++++-----------------
 4 files changed, 56 insertions(+), 81 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
index f46e8b8a8bf..16b61f8e991 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
@@ -27,7 +27,6 @@ import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -137,31 +136,43 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     }
 
     /**
-     * Returns the next {{@link Event}} available. This method block 
indefinitely until
-     * one event is ready or the accumulator is closed.
+     * Returns the next {{@link Event}} available or null if no event is
+     * available.
      *
-     * @return The next event.
+     * @return The next event available or null.
      */
     public T poll() {
-        return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
+        lock.lock();
+        try {
+            K key = randomKey();
+            if (key == null) return null;
+
+            Queue<T> queue = queues.get(key);
+            T event = queue.poll();
+
+            if (queue.isEmpty()) queues.remove(key);
+            inflightKeys.add(key);
+            size--;
+
+            return event;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
-     * Returns the next {{@link Event}} available. This method blocks for the 
provided
-     * time and returns null of not event is available.
+     * Returns the next {{@link Event}} available. This method blocks until an
+     * event is available or accumulator is closed.
      *
-     * @param timeout   The timeout.
-     * @param unit      The timeout unit.
      * @return The next event available or null.
      */
-    public T poll(long timeout, TimeUnit unit) {
+    public T take() {
         lock.lock();
         try {
             K key = randomKey();
-            long nanos = unit.toNanos(timeout);
-            while (key == null && !closed && nanos > 0) {
+            while (key == null && !closed) {
                 try {
-                    nanos = condition.awaitNanos(nanos);
+                    condition.await();
                 } catch (InterruptedException e) {
                     // Ignore.
                 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index e4adc18e957..0e3d563861c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -128,7 +127,7 @@ public class MultiThreadedEventProcessor implements 
CoordinatorEventProcessor {
         private void handleEvents() {
             while (!shuttingDown) {
                 recordPollStartTime(time.milliseconds());
-                CoordinatorEvent event = accumulator.poll();
+                CoordinatorEvent event = accumulator.take();
                 recordPollEndTime(time.milliseconds());
                 if (event != null) {
                     try {
@@ -148,8 +147,8 @@ public class MultiThreadedEventProcessor implements 
CoordinatorEventProcessor {
         }
 
         private void drainEvents() {
-            CoordinatorEvent event = accumulator.poll(0, 
TimeUnit.MILLISECONDS);
-            while (event != null) {
+            CoordinatorEvent event;
+            while ((event = accumulator.poll()) != null) {
                 try {
                     log.debug("Draining event: {}.", event);
                     metrics.recordEventQueueTime(time.milliseconds() - 
event.createdTimeMs());
@@ -159,8 +158,6 @@ public class MultiThreadedEventProcessor implements 
CoordinatorEventProcessor {
                 } finally {
                     accumulator.done(event);
                 }
-
-                event = accumulator.poll(0, TimeUnit.MILLISECONDS);
             }
         }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
index 147cf08121c..e077fb5e022 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
@@ -78,7 +78,7 @@ public class EventAccumulatorTest {
         EventAccumulator<Integer, MockEvent> accumulator = new 
EventAccumulator<>();
 
         assertEquals(0, accumulator.size());
-        assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+        assertNull(accumulator.poll());
 
         List<MockEvent> events = Arrays.asList(
             new MockEvent(1, 0),
@@ -97,14 +97,14 @@ public class EventAccumulatorTest {
 
         Set<MockEvent> polledEvents = new HashSet<>();
         for (int i = 0; i < events.size(); i++) {
-            MockEvent event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+            MockEvent event = accumulator.poll();
             assertNotNull(event);
             polledEvents.add(event);
             assertEquals(events.size() - 1 - i, accumulator.size());
             accumulator.done(event);
         }
 
-        assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+        assertNull(accumulator.poll());
         assertEquals(new HashSet<>(events), polledEvents);
         assertEquals(0, accumulator.size());
 
@@ -126,27 +126,27 @@ public class EventAccumulatorTest {
         MockEvent event = null;
 
         // Poll event0.
-        event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+        event = accumulator.poll();
         assertEquals(event0, event);
 
         // Poll returns null because key is inflight.
-        assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+        assertNull(accumulator.poll());
         accumulator.done(event);
 
         // Poll event1.
-        event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+        event = accumulator.poll();
         assertEquals(event1, event);
 
         // Poll returns null because key is inflight.
-        assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+        assertNull(accumulator.poll());
         accumulator.done(event);
 
         // Poll event2.
-        event = accumulator.poll(0, TimeUnit.MICROSECONDS);
+        event = accumulator.poll();
         assertEquals(event2, event);
 
         // Poll returns null because key is inflight.
-        assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
+        assertNull(accumulator.poll());
         accumulator.done(event);
 
         accumulator.close();
@@ -160,9 +160,9 @@ public class EventAccumulatorTest {
         MockEvent event1 = new MockEvent(1, 1);
         MockEvent event2 = new MockEvent(1, 2);
 
-        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(accumulator::poll);
-        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(accumulator::poll);
-        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(accumulator::poll);
+        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(accumulator::take);
+        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(accumulator::take);
+        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(accumulator::take);
         List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0, 
future1, future2);
 
         assertFalse(future0.isDone());
@@ -215,9 +215,9 @@ public class EventAccumulatorTest {
     public void testCloseUnblockWaitingThreads() throws ExecutionException, 
InterruptedException, TimeoutException {
         EventAccumulator<Integer, MockEvent> accumulator = new 
EventAccumulator<>();
 
-        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(accumulator::poll);
-        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(accumulator::poll);
-        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(accumulator::poll);
+        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(accumulator::take);
+        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(accumulator::take);
+        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(accumulator::take);
 
         assertFalse(future0.isDone());
         assertFalse(future1.isDone());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 2714188f65e..3708141827c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -27,15 +27,12 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
@@ -53,53 +50,19 @@ import static org.mockito.Mockito.verify;
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
-    private static class MockEventAccumulator<T> extends 
EventAccumulator<TopicPartition, CoordinatorEvent> {
+    private static class DelayEventAccumulator extends 
EventAccumulator<TopicPartition, CoordinatorEvent> {
         private final Time time;
-        private final Queue<CoordinatorEvent> events;
-        private final long timeToPollMs;
-        private final AtomicBoolean isClosed;
+        private final long takeDelayMs;
 
-        public MockEventAccumulator(Time time, long timeToPollMs) {
+        public DelayEventAccumulator(Time time, long takeDelayMs) {
             this.time = time;
-            this.events = new LinkedList<>();
-            this.timeToPollMs = timeToPollMs;
-            this.isClosed = new AtomicBoolean(false);
+            this.takeDelayMs = takeDelayMs;
         }
 
         @Override
-        public CoordinatorEvent poll() {
-            synchronized (events) {
-                while (events.isEmpty() && !isClosed.get()) {
-                    try {
-                        events.wait();
-                    } catch (Exception ignored) {
-                        
-                    }
-                }
-                time.sleep(timeToPollMs);
-                return events.poll();
-            }
-        }
-
-        @Override
-        public CoordinatorEvent poll(long timeout, TimeUnit unit) {
-            return null;
-        }
-
-        @Override
-        public void add(CoordinatorEvent event) throws 
RejectedExecutionException {
-            synchronized (events) {
-                events.add(event);
-                events.notifyAll();
-            }
-        }
-
-        @Override
-        public void close() {
-            isClosed.set(true);
-            synchronized (events) {
-                events.notifyAll();
-            }
+        public CoordinatorEvent take() {
+            time.sleep(takeDelayMs);
+            return super.take();
         }
     }
 
@@ -353,7 +316,11 @@ public class MultiThreadedEventProcessorTest {
             AtomicInteger numEventsExecuted = new AtomicInteger(0);
 
             // Special event which blocks until the latch is released.
-            FutureEvent<Integer> blockingEvent = new FutureEvent<>(new 
TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true);
+            FutureEvent<Integer> blockingEvent = new FutureEvent<>(
+                new TopicPartition("foo", 0),
+                numEventsExecuted::incrementAndGet,
+                true
+            );
 
             List<FutureEvent<Integer>> events = Arrays.asList(
                 new FutureEvent<>(new TopicPartition("foo", 0), 
numEventsExecuted::incrementAndGet),
@@ -428,7 +395,7 @@ public class MultiThreadedEventProcessorTest {
             1, // Use a single thread to block event in the processor.
             mockTime,
             mockRuntimeMetrics,
-            new MockEventAccumulator<>(mockTime, 500L)
+            new DelayEventAccumulator(mockTime, 500L)
         )) {
             // Enqueue the blocking event.
             eventProcessor.enqueue(blockingEvent);
@@ -501,7 +468,7 @@ public class MultiThreadedEventProcessorTest {
             2,
             Time.SYSTEM,
             mockRuntimeMetrics,
-            new MockEventAccumulator<>(Time.SYSTEM, 100L)
+            new DelayEventAccumulator(Time.SYSTEM, 100L)
         )) {
             List<Double> recordedRatios = new ArrayList<>();
             AtomicInteger numEventsExecuted = new AtomicInteger(0);

Reply via email to