This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6441f5845f75dad860c65d71aaca252973dbaba2 Author: Matthias Pohl <[email protected]> AuthorDate: Tue Jan 23 16:21:45 2024 +0100 [hotfix][test] Makes ManuallyTriggeredScheduledExecutorService#execute rely on a BlockingQueue This allows us to wait for tasks to "arrive". --- .../ManuallyTriggeredScheduledExecutorService.java | 44 ++++++++++++++-------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java index 0ae1d282c1b..7bd4c9dc74b 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java @@ -20,16 +20,18 @@ package org.apache.flink.core.testutils; import javax.annotation.Nonnull; -import java.util.ArrayDeque; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -44,7 +46,7 @@ import java.util.stream.Collectors; */ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService { - private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>(); + private final BlockingQueue<Runnable> queuedRunnables = new LinkedBlockingQueue<>(); private final ConcurrentLinkedQueue<ScheduledTask<?>> nonPeriodicScheduledTasks = new ConcurrentLinkedQueue<>(); @@ -60,9 +62,7 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu @Override public void execute(@Nonnull Runnable command) { - synchronized (queuedRunnables) { - queuedRunnables.addLast(command); - } + queuedRunnables.add(command); } @Override @@ -182,24 +182,38 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu } /** - * Triggers the next queued runnable and executes it synchronously. This method throws an - * exception if no Runnable is currently queued. + * Triggers the next task that was submitted for execution. The method blocks the given amount + * of time if no task is scheduled, yet. + * + * @param timeout The time to wait for a new task to be scheduled. + * @throws IllegalStateException if no task was scheduled in the given amount of time. */ - public void trigger() { - final Runnable next; + public void trigger(Duration timeout) { + try { + final Runnable task = queuedRunnables.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); - synchronized (queuedRunnables) { - next = queuedRunnables.removeFirst(); + if (task == null) { + throw new IllegalStateException("No task was scheduled."); + } + + task.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + } - next.run(); + /** + * Triggers the next task that was submitted for execution. + * + * @throws IllegalStateException if no task was scheduled before calling this method. + */ + public void trigger() { + trigger(Duration.ZERO); } /** Gets the number of Runnables currently queued. */ public int numQueuedRunnables() { - synchronized (queuedRunnables) { - return queuedRunnables.size(); - } + return queuedRunnables.size(); } public Collection<ScheduledFuture<?>> getActiveScheduledTasks() {
