This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6441f5845f75dad860c65d71aaca252973dbaba2
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Jan 23 16:21:45 2024 +0100

    [hotfix][test] Makes ManuallyTriggeredScheduledExecutorService#execute rely 
on a BlockingQueue
    
    This allows us to wait for tasks to "arrive".
---
 .../ManuallyTriggeredScheduledExecutorService.java | 44 ++++++++++++++--------
 1 file changed, 29 insertions(+), 15 deletions(-)

diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
index 0ae1d282c1b..7bd4c9dc74b 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
@@ -20,16 +20,18 @@ package org.apache.flink.core.testutils;
 
 import javax.annotation.Nonnull;
 
-import java.util.ArrayDeque;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +46,7 @@ import java.util.stream.Collectors;
  */
 public class ManuallyTriggeredScheduledExecutorService implements 
ScheduledExecutorService {
 
-    private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
+    private final BlockingQueue<Runnable> queuedRunnables = new 
LinkedBlockingQueue<>();
 
     private final ConcurrentLinkedQueue<ScheduledTask<?>> 
nonPeriodicScheduledTasks =
             new ConcurrentLinkedQueue<>();
@@ -60,9 +62,7 @@ public class ManuallyTriggeredScheduledExecutorService 
implements ScheduledExecu
 
     @Override
     public void execute(@Nonnull Runnable command) {
-        synchronized (queuedRunnables) {
-            queuedRunnables.addLast(command);
-        }
+        queuedRunnables.add(command);
     }
 
     @Override
@@ -182,24 +182,38 @@ public class ManuallyTriggeredScheduledExecutorService 
implements ScheduledExecu
     }
 
     /**
-     * Triggers the next queued runnable and executes it synchronously. This 
method throws an
-     * exception if no Runnable is currently queued.
+     * Triggers the next task that was submitted for execution. The method 
blocks the given amount
+     * of time if no task is scheduled, yet.
+     *
+     * @param timeout The time to wait for a new task to be scheduled.
+     * @throws IllegalStateException if no task was scheduled in the given 
amount of time.
      */
-    public void trigger() {
-        final Runnable next;
+    public void trigger(Duration timeout) {
+        try {
+            final Runnable task = queuedRunnables.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
 
-        synchronized (queuedRunnables) {
-            next = queuedRunnables.removeFirst();
+            if (task == null) {
+                throw new IllegalStateException("No task was scheduled.");
+            }
+
+            task.run();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         }
+    }
 
-        next.run();
+    /**
+     * Triggers the next task that was submitted for execution.
+     *
+     * @throws IllegalStateException if no task was scheduled before calling 
this method.
+     */
+    public void trigger() {
+        trigger(Duration.ZERO);
     }
 
     /** Gets the number of Runnables currently queued. */
     public int numQueuedRunnables() {
-        synchronized (queuedRunnables) {
-            return queuedRunnables.size();
-        }
+        return queuedRunnables.size();
     }
 
     public Collection<ScheduledFuture<?>> getActiveScheduledTasks() {

Reply via email to