This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381_3 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit a0115c503fbbb0dd109421a08e0f56f9f5b697e4 Author: Daniel Sun <[email protected]> AuthorDate: Sat Mar 14 02:25:27 2026 +0900 Minor tweaks --- src/main/java/groovy/concurrent/Awaitable.java | 82 ++--- .../concurrent/AwaitableAdapterRegistry.java | 208 +----------- .../groovy/parser/antlr4/ModifierManager.java | 19 ++ .../groovy/runtime/async/AsyncStreamGenerator.java | 24 +- .../apache/groovy/runtime/async/AsyncSupport.java | 86 +++-- .../groovy/runtime/async/FlowPublisherAdapter.java | 377 +++++++++++++++++++++ .../groovy/transform/AsyncASTTransformation.java | 4 +- .../services/groovy.concurrent.AwaitableAdapter | 16 + src/spec/doc/core-async-await.adoc | 40 ++- src/spec/test/AsyncAwaitSpecTest.groovy | 10 +- .../codehaus/groovy/transform/AsyncApiTest.groovy | 4 +- .../groovy/transform/AsyncConcurrencyTest.groovy | 226 ++++++++++++ .../groovy/transform/AsyncPatternsTest.groovy | 4 +- 13 files changed, 804 insertions(+), 296 deletions(-) diff --git a/src/main/java/groovy/concurrent/Awaitable.java b/src/main/java/groovy/concurrent/Awaitable.java index b64d556d87..ce41b7f947 100644 --- a/src/main/java/groovy/concurrent/Awaitable.java +++ b/src/main/java/groovy/concurrent/Awaitable.java @@ -233,20 +233,18 @@ public interface Awaitable<T> { /** * Returns a new {@code Awaitable} that fails with {@link TimeoutException} - * if this computation does not complete within the specified duration. + * if this computation does not complete within the specified milliseconds. * <p> * Unlike {@link #get(long, TimeUnit)}, this is a non-blocking, composable * timeout combinator: it returns another {@code Awaitable} that can itself * be awaited, chained, or passed to {@link #all(Object...)} / {@link #any(Object...)}. - * This plays a role similar to Kotlin's {@code withTimeout} while - * preserving Groovy's awaitable abstraction. * - * @param duration the timeout duration in milliseconds + * @param timeoutMillis the timeout duration in milliseconds * @return a new awaitable with timeout semantics * @since 6.0.0 */ - default Awaitable<T> orTimeout(long duration) { - return Awaitable.timeout(this, duration, TimeUnit.MILLISECONDS); + default Awaitable<T> orTimeoutMillis(long timeoutMillis) { + return Awaitable.orTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS); } /** @@ -259,7 +257,7 @@ public interface Awaitable<T> { * @since 6.0.0 */ default Awaitable<T> orTimeout(long duration, TimeUnit unit) { - return Awaitable.timeout(this, duration, unit); + return Awaitable.orTimeout(this, duration, unit); } /** @@ -267,12 +265,12 @@ public interface Awaitable<T> { * value if this computation does not finish before the timeout expires. * * @param fallback the value to use when the timeout expires - * @param duration the timeout duration in milliseconds + * @param timeoutMillis the timeout duration in milliseconds * @return a new awaitable that yields either the original result or the fallback * @since 6.0.0 */ - default Awaitable<T> completeOnTimeout(T fallback, long duration) { - return Awaitable.timeoutOr(this, fallback, duration, TimeUnit.MILLISECONDS); + default Awaitable<T> completeOnTimeoutMillis(T fallback, long timeoutMillis) { + return Awaitable.completeOnTimeout(this, fallback, timeoutMillis, TimeUnit.MILLISECONDS); } /** @@ -286,7 +284,7 @@ public interface Awaitable<T> { * @since 6.0.0 */ default Awaitable<T> completeOnTimeout(T fallback, long duration, TimeUnit unit) { - return Awaitable.timeoutOr(this, fallback, duration, unit); + return Awaitable.completeOnTimeout(this, fallback, duration, unit); } /** @@ -406,9 +404,13 @@ public interface Awaitable<T> { return AsyncSupport.delay(duration, unit); } + // ---- Timeout combinators ---- + /** * Adapts the given source to an {@code Awaitable} and applies a non-blocking - * timeout to it. + * fail-fast timeout. Returns a new awaitable that fails with + * {@link TimeoutException} if the source does not complete before the + * deadline elapses. * <p> * The source may be a Groovy {@link Awaitable}, a JDK * {@link CompletableFuture}/{@link java.util.concurrent.CompletionStage}, @@ -416,58 +418,60 @@ public interface Awaitable<T> { * a concise timeout combinator analogous to Kotlin's {@code withTimeout}, * but as a value-level operation that returns another awaitable. * - * @param source the async source to time out - * @param duration the timeout duration in milliseconds - * @return a new awaitable that fails with {@link TimeoutException} on timeout + * @param source the async source to time out + * @param timeoutMillis the timeout duration in milliseconds + * @param <T> the result type + * @return a new awaitable with timeout semantics * @since 6.0.0 */ - static <T> Awaitable<T> timeout(Object source, long duration) { - return AsyncSupport.timeout(source, duration, TimeUnit.MILLISECONDS); + static <T> Awaitable<T> orTimeoutMillis(Object source, long timeoutMillis) { + return AsyncSupport.orTimeout(source, timeoutMillis, TimeUnit.MILLISECONDS); } /** - * Adapts the given source to an {@code Awaitable} and applies a non-blocking - * timeout to it. + * Adapts the given source and applies a non-blocking fail-fast timeout + * with explicit {@link TimeUnit}. * - * @param source the async source to time out + * @param source the async source to time out * @param duration the timeout duration - * @param unit the time unit - * @return a new awaitable that fails with {@link TimeoutException} on timeout + * @param unit the time unit + * @param <T> the result type + * @return a new awaitable with timeout semantics * @since 6.0.0 */ - static <T> Awaitable<T> timeout(Object source, long duration, TimeUnit unit) { - return AsyncSupport.timeout(source, duration, unit); + static <T> Awaitable<T> orTimeout(Object source, long duration, TimeUnit unit) { + return AsyncSupport.orTimeout(source, duration, unit); } /** - * Adapts the given source to an {@code Awaitable} and returns a new - * awaitable that yields the supplied fallback value if the timeout expires - * first. + * Adapts the given source and returns a new awaitable that yields the + * supplied fallback value if the timeout expires first. * - * @param source the async source to wait for - * @param fallback the fallback value to use on timeout - * @param duration the timeout duration in milliseconds + * @param source the async source to wait for + * @param fallback the fallback value to use on timeout + * @param timeoutMillis the timeout duration in milliseconds + * @param <T> the result type * @return a new awaitable yielding either the original result or the fallback * @since 6.0.0 */ - static <T> Awaitable<T> timeoutOr(Object source, T fallback, long duration) { - return AsyncSupport.timeoutOr(source, fallback, duration, TimeUnit.MILLISECONDS); + static <T> Awaitable<T> completeOnTimeoutMillis(Object source, T fallback, long timeoutMillis) { + return AsyncSupport.completeOnTimeout(source, fallback, timeoutMillis, TimeUnit.MILLISECONDS); } /** - * Adapts the given source to an {@code Awaitable} and returns a new - * awaitable that yields the supplied fallback value if the timeout expires - * first. + * Adapts the given source and returns a new awaitable that yields the + * supplied fallback value if the timeout expires first. * - * @param source the async source to wait for + * @param source the async source to wait for * @param fallback the fallback value to use on timeout * @param duration the timeout duration - * @param unit the time unit + * @param unit the time unit + * @param <T> the result type * @return a new awaitable yielding either the original result or the fallback * @since 6.0.0 */ - static <T> Awaitable<T> timeoutOr(Object source, T fallback, long duration, TimeUnit unit) { - return AsyncSupport.timeoutOr(source, fallback, duration, unit); + static <T> Awaitable<T> completeOnTimeout(Object source, T fallback, long duration, TimeUnit unit) { + return AsyncSupport.completeOnTimeout(source, fallback, duration, unit); } // ---- Executor configuration ---- diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java index 02fe9b7325..a3500a13d3 100644 --- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java +++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java @@ -23,18 +23,13 @@ import org.apache.groovy.runtime.async.GroovyPromise; import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.Flow; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * Central registry for {@link AwaitableAdapter} instances. @@ -45,12 +40,14 @@ import java.util.concurrent.atomic.AtomicReference; * <ul> * <li>{@link CompletableFuture} and {@link CompletionStage}</li> * <li>{@link Future} (adapted via a blocking wrapper)</li> - * <li>JDK {@link Flow.Publisher} — single-value - * ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream}) - * with backpressure support and upstream cancellation when the - * resulting {@link AsyncStream} is {@linkplain AsyncStream#close() closed}</li> * </ul> * <p> + * JDK {@link java.util.concurrent.Flow.Publisher} support is provided by + * the separately registered + * {@link org.apache.groovy.runtime.async.FlowPublisherAdapter}, which is + * auto-discovered via ServiceLoader. Third-party frameworks (Reactor, + * RxJava, etc.) may register their own higher-priority adapters. + * <p> * Additional adapters can be registered at runtime via {@link #register}. * * @see AwaitableAdapter @@ -163,25 +160,18 @@ public class AwaitableAdapterRegistry { /** * Built-in adapter handling JDK {@link CompletableFuture}, {@link CompletionStage}, - * {@link Future}, {@link Flow.Publisher}, - * and {@link Iterable}/{@link Iterator} (for async stream bridging). + * {@link Future}, and {@link Iterable}/{@link Iterator} (for async stream bridging). * <p> * {@link CompletionStage} support enables seamless integration with frameworks * that return {@code CompletionStage} (e.g., Spring's async APIs, Reactor's * {@code Mono.toFuture()}, etc.) without any additional adapter registration. - * <p> - * {@link Flow.Publisher} support enables seamless - * consumption of reactive streams via {@code for await} without any adapter - * registration. This covers any reactive library that implements the JDK - * standard reactive-streams interface (Reactor, RxJava via adapters, etc.). */ private static class BuiltInAdapter implements AwaitableAdapter { @Override public boolean supportsAwaitable(Class<?> type) { return CompletionStage.class.isAssignableFrom(type) - || Future.class.isAssignableFrom(type) - || Flow.Publisher.class.isAssignableFrom(type); + || Future.class.isAssignableFrom(type); } @Override @@ -190,9 +180,6 @@ public class AwaitableAdapterRegistry { if (source instanceof CompletionStage) { return new GroovyPromise<>(((CompletionStage<T>) source).toCompletableFuture()); } - if (source instanceof Flow.Publisher<?> pub) { - return publisherToAwaitable(pub); - } if (source instanceof Future) { Future<T> future = (Future<T>) source; CompletableFuture<T> cf = new CompletableFuture<>(); @@ -214,16 +201,12 @@ public class AwaitableAdapterRegistry { @Override public boolean supportsAsyncStream(Class<?> type) { return Iterable.class.isAssignableFrom(type) - || Iterator.class.isAssignableFrom(type) - || Flow.Publisher.class.isAssignableFrom(type); + || Iterator.class.isAssignableFrom(type); } @Override @SuppressWarnings("unchecked") public <T> AsyncStream<T> toAsyncStream(Object source) { - if (source instanceof Flow.Publisher<?> pub) { - return publisherToAsyncStream((Flow.Publisher<T>) pub); - } final Iterator<T> iterator; if (source instanceof Iterable) { iterator = ((Iterable<T>) source).iterator(); @@ -249,177 +232,6 @@ public class AwaitableAdapterRegistry { }; } - // Signal wrappers for the publisher-to-async-stream queue, ensuring - // that values, errors, and completion are never confused — even when - // the element type T extends Throwable. - private static final Object COMPLETE_SENTINEL = new Object(); - private record ValueSignal<T>(T value) { } - private record ErrorSignal(Throwable error) { } - - /** - * Adapts a {@link Flow.Publisher} to an {@link AsyncStream} using a - * bounded blocking queue to bridge the push-based reactive-streams - * protocol to the pull-based {@code moveNext}/{@code getCurrent} - * pattern. Backpressure is managed by requesting one item at a time: - * each {@code moveNext()} call requests the next item from the upstream - * subscription only after the previous item has been consumed. - * <p> - * All queue entries are wrapped in typed signal objects - * ({@code ValueSignal}, {@code ErrorSignal}, or a completion sentinel) - * so that element types extending {@link Throwable} are never - * misidentified as error signals. - * <p> - * Thread interruption during {@code queue.take()} is converted to a - * {@link CancellationException} that is <em>thrown directly</em> rather - * than stored in a {@code CompletableFuture}. On JDK 23+, - * {@code CompletableFuture.get()} wraps stored - * {@code CancellationException}s in a new instance with message - * {@code "get"}, discarding the original message and cause chain. - * Throwing directly bypasses {@code CF.get()} entirely, ensuring - * deterministic behaviour across all JDK versions (17 – 25+). - * The interrupt flag is restored per Java convention, and the original - * {@code InterruptedException} is preserved as the - * {@linkplain Throwable#getCause() cause}. This matches the pattern - * used by {@link org.apache.groovy.runtime.async.AsyncStreamGenerator#moveNext()}. - */ - @SuppressWarnings("unchecked") - private static <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> publisher) { - BlockingQueue<Object> queue = new LinkedBlockingQueue<>(256); - AtomicReference<Flow.Subscription> subRef = new AtomicReference<>(); - AtomicBoolean closedRef = new AtomicBoolean(false); - - publisher.subscribe(new Flow.Subscriber<T>() { - @Override - public void onSubscribe(Flow.Subscription s) { - if (!closedRef.get()) { - subRef.set(s); - s.request(1); - } else { - s.cancel(); - } - } - - @Override - public void onNext(T item) { - if (!closedRef.get()) { - queue.offer(new ValueSignal<>(item)); - } - } - - @Override - public void onError(Throwable t) { - if (!closedRef.get()) { - queue.offer(new ErrorSignal(t)); - } - } - - @Override - public void onComplete() { - if (!closedRef.get()) { - queue.offer(COMPLETE_SENTINEL); - } - } - }); - - return new AsyncStream<T>() { - private volatile T current; - private final AtomicBoolean streamClosed = new AtomicBoolean(false); - - @Override - public Awaitable<Boolean> moveNext() { - if (streamClosed.get()) { - return Awaitable.of(false); - } - CompletableFuture<Boolean> cf = new CompletableFuture<>(); - try { - Object signal = queue.take(); - if (signal == COMPLETE_SENTINEL) { - streamClosed.set(true); - cf.complete(false); - } else if (signal instanceof ErrorSignal es) { - streamClosed.set(true); - cf.completeExceptionally(es.error()); - } else if (signal instanceof ValueSignal<?> vs) { - current = (T) vs.value(); - cf.complete(true); - Flow.Subscription sub = subRef.get(); - if (sub != null) sub.request(1); - } - } catch (InterruptedException e) { - if (streamClosed.get()) { - return Awaitable.of(false); - } - // Throw directly instead of storing in the CompletableFuture. - // On JDK 23+, CF.get() wraps stored CancellationExceptions in a - // new CancellationException("get"), discarding our message and - // cause chain. Throwing directly avoids CF.get() entirely and - // ensures deterministic behaviour across all JDK versions. - // This matches the pattern used by AsyncStreamGenerator.moveNext(). - Thread.currentThread().interrupt(); - CancellationException ce = new CancellationException("Interrupted while waiting for next item"); - ce.initCause(e); - throw ce; - } - return new GroovyPromise<>(cf); - } - - @Override - public T getCurrent() { - return current; - } - - @Override - public void close() { - if (!streamClosed.compareAndSet(false, true)) { - return; - } - closedRef.set(true); - Flow.Subscription subscription = subRef.getAndSet(null); - if (subscription != null) { - subscription.cancel(); - } - queue.clear(); - queue.offer(COMPLETE_SENTINEL); - } - }; - } - - /** - * Adapts a single-value {@link Flow.Publisher} to - * an {@link Awaitable}. Subscribes and takes the first emitted value. - */ - @SuppressWarnings("unchecked") - private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<?> publisher) { - CompletableFuture<T> cf = new CompletableFuture<>(); - publisher.subscribe(new Flow.Subscriber<Object>() { - private Flow.Subscription subscription; - - @Override - public void onSubscribe(Flow.Subscription s) { - this.subscription = s; - s.request(1); - } - - @Override - @SuppressWarnings("unchecked") - public void onNext(Object item) { - cf.complete((T) item); - subscription.cancel(); - } - - @Override - public void onError(Throwable t) { - cf.completeExceptionally(t); - } - - @Override - public void onComplete() { - if (!cf.isDone()) cf.complete(null); - } - }); - return new GroovyPromise<>(cf); - } - private static <T> void completeFrom(CompletableFuture<T> cf, Future<T> future) { try { cf.complete(future.get()); @@ -430,6 +242,8 @@ public class AwaitableAdapterRegistry { cf.completeExceptionally(ce); } catch (ExecutionException e) { cf.completeExceptionally(e.getCause() != null ? e.getCause() : e); + } catch (CancellationException e) { + cf.completeExceptionally(e); } catch (Exception e) { cf.completeExceptionally(e); } diff --git a/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java b/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java index 729eb14cf2..b84dd808a8 100644 --- a/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java +++ b/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java @@ -164,6 +164,7 @@ class ModifierManager { } public Parameter processParameter(Parameter parameter) { + rejectAsyncModifier("parameter declarations"); modifierNodeList.forEach(e -> { parameter.setModifiers(parameter.getModifiers() | e.getOpcode()); @@ -192,6 +193,7 @@ class ModifierManager { } public VariableExpression processVariableExpression(VariableExpression ve) { + rejectAsyncModifier("variable declarations"); modifierNodeList.forEach(e -> { ve.setModifiers(ve.getModifiers() | e.getOpcode()); @@ -201,6 +203,23 @@ class ModifierManager { return ve; } + /** + * Rejects the {@code async} modifier when used in an unsupported context + * (e.g. parameter or local variable declarations). The grammar allows + * {@code async} as a general modifier, but semantically it is only valid + * on method declarations. This check prevents silent acceptance. + * + * @param target human-readable description of the disallowed context + */ + private void rejectAsyncModifier(String target) { + Optional<ModifierNode> asyncModifier = get(ASYNC); + if (asyncModifier.isPresent()) { + throw astBuilder.createParsingFailedException( + "modifier `async` is only allowed for method declarations, not for " + target, + asyncModifier.get()); + } + } + public <T extends AnnotatedNode> T attachAnnotations(T node) { this.getAnnotations().forEach(node::addAnnotation); diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java index 38c66e62a7..59299f9fb3 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java @@ -208,10 +208,17 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { * <p> * The consumer thread is registered via {@code consumerThread} during the * blocking call so that {@link #close()} can interrupt it if needed. + * <p> + * <b>Single-consumer invariant</b>: only one thread may call + * {@code moveNext()} at a time. A {@code compareAndSet(null, current)} + * guard enforces this — a concurrent second caller receives an + * {@link IllegalStateException} immediately instead of silently corrupting + * the producer/consumer handshake. + * <p> * A <em>double-check</em> of the {@link #closed} flag is performed after * registration to close the TOCTOU race window: if {@code close()} executes * between the initial {@code closed.get()} check and the - * {@code consumerThread.set()} call, the consumer reference would not yet be + * {@code consumerThread} CAS, the consumer reference would not yet be * visible to {@code close()}, so no interrupt would be delivered, and * {@code queue.take()} would block indefinitely. The re-check after * registration detects this case and returns immediately. @@ -226,10 +233,21 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { if (closed.get()) { return Awaitable.of(false); } + // Enforce single-consumer semantics: only one thread may call moveNext() + // at a time. A concurrent second caller would overwrite consumerThread, + // breaking close()'s interrupt targeting and causing data races on + // queue.take(). CAS guards this invariant at the cost of one atomic op. Thread currentThread = Thread.currentThread(); - consumerThread.set(currentThread); + if (!consumerThread.compareAndSet(null, currentThread)) { + Thread existing = consumerThread.get(); + if (existing != currentThread) { + throw new IllegalStateException( + "AsyncStream does not support concurrent consumers. " + + "Current consumer: " + existing); + } + } // Double-check after registration: if close() raced between the first - // closed check and consumerThread.set(), the consumer reference was not + // closed check and consumerThread CAS, the consumer reference was not // yet visible to close(), so no interrupt was delivered. Without this // re-check the consumer would block in queue.take() indefinitely. if (closed.get()) { diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java index 633e7038d4..5d8f4aaaf4 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java @@ -80,7 +80,7 @@ import java.util.concurrent.TimeoutException; * LIFO execution and exception suppression</li> * <li><b>Delay</b> — {@code delay()} provides non-blocking delays using a * shared {@link java.util.concurrent.ScheduledExecutorService}</li> - * <li><b>Timeouts</b> — {@code timeout()} and {@code timeoutOr()} apply + * <li><b>Timeouts</b> — {@code orTimeout()} and {@code completeOnTimeout()} apply * non-blocking deadlines while preserving the {@link Awaitable} abstraction</li> * </ul> * <p> @@ -633,64 +633,86 @@ public class AsyncSupport { } /** - * Adapts the given source to an {@link Awaitable} and returns a new - * awaitable that fails with {@link java.util.concurrent.TimeoutException} - * if the source does not complete before the timeout expires. + * Applies fail-fast timeout semantics to the given source. + * Returns an awaitable that fails with {@link TimeoutException} if the + * source does not complete before the timeout elapses. * <p> * The timeout does <em>not</em> cancel the original source automatically. - * This mirrors the value-level timeout composition style used by - * JavaScript's race-based patterns and keeps Groovy's semantics explicit. - * Callers that require cooperative cancellation can still invoke - * {@link Awaitable#cancel()} on the original task explicitly. + * This mirrors value-level race composition and keeps cancellation explicit. * * @param source the async source to time out - * @param duration the timeout duration - * @param unit the time unit + * @param timeout the timeout duration + * @param unit the timeout unit * @param <T> the result type - * @return a new awaitable with timeout semantics - * @throws IllegalArgumentException if {@code source} is {@code null}, - * {@code duration} is negative, or {@code unit} is {@code null} + * @return an awaitable that fails on timeout + * @throws IllegalArgumentException if any argument is invalid * @since 6.0.0 */ @SuppressWarnings("unchecked") - public static <T> Awaitable<T> timeout(Object source, long duration, TimeUnit unit) { - validateTimeoutArguments(source, duration, unit, "Awaitable.timeout"); + public static <T> Awaitable<T> orTimeout(Object source, long timeout, TimeUnit unit) { + validateTimeoutArguments(source, timeout, unit, "Awaitable.orTimeout"); CompletableFuture<T> sourceFuture = (CompletableFuture<T>) toCompletableFuture(source); CompletableFuture<T> result = new CompletableFuture<>(); TimeoutException te = new TimeoutException( - "Timed out after " + duration + " " + unit.name().toLowerCase(Locale.ROOT)); - scheduleTimeoutRace(sourceFuture, result, () -> result.completeExceptionally(te), duration, unit); + "Timed out after " + timeout + " " + unit.name().toLowerCase(Locale.ROOT)); + scheduleTimeoutRace(sourceFuture, result, () -> result.completeExceptionally(te), timeout, unit); return GroovyPromise.of(result); } /** - * Adapts the given source to an {@link Awaitable} and returns a new - * awaitable that yields the supplied fallback value if the timeout expires - * first. + * Millisecond shortcut for {@link #orTimeout(Object, long, TimeUnit)}. + * + * @param source the async source to time out + * @param timeoutMillis timeout in milliseconds + * @param <T> result type + * @return an awaitable that fails on timeout + * @since 6.0.0 + */ + public static <T> Awaitable<T> orTimeoutMillis(Object source, long timeoutMillis) { + return orTimeout(source, timeoutMillis, TimeUnit.MILLISECONDS); + } + + /** + * Applies fallback-on-timeout semantics to the given source. + * Returns an awaitable that completes with the supplied fallback value if + * the source has not completed before the timeout elapses. * * @param source the async source to wait for - * @param fallback the value to use when the timeout expires - * @param duration the timeout duration - * @param unit the time unit + * @param fallback fallback value when timeout elapses first + * @param timeout the timeout duration + * @param unit the timeout unit * @param <T> the result type - * @return a new awaitable yielding either the source result or the fallback - * @throws IllegalArgumentException if {@code source} is {@code null}, - * {@code duration} is negative, or {@code unit} is {@code null} + * @return an awaitable that yields either source result or fallback value + * @throws IllegalArgumentException if any argument is invalid * @since 6.0.0 */ @SuppressWarnings("unchecked") - public static <T> Awaitable<T> timeoutOr(Object source, T fallback, long duration, TimeUnit unit) { - validateTimeoutArguments(source, duration, unit, "Awaitable.timeoutOr"); + public static <T> Awaitable<T> completeOnTimeout(Object source, T fallback, long timeout, TimeUnit unit) { + validateTimeoutArguments(source, timeout, unit, "Awaitable.completeOnTimeout"); CompletableFuture<T> sourceFuture = (CompletableFuture<T>) toCompletableFuture(source); CompletableFuture<T> result = new CompletableFuture<>(); - scheduleTimeoutRace(sourceFuture, result, () -> result.complete(fallback), duration, unit); + scheduleTimeoutRace(sourceFuture, result, () -> result.complete(fallback), timeout, unit); return GroovyPromise.of(result); } /** - * Shared logic for {@link #timeout} and {@link #timeoutOr}: schedules the - * timeout action and wires up source completion to cancel the timer and - * propagate the result or error. + * Millisecond shortcut for {@link #completeOnTimeout(Object, Object, long, TimeUnit)}. + * + * @param source the async source to wait for + * @param fallback fallback value when timeout elapses first + * @param timeoutMillis timeout in milliseconds + * @param <T> result type + * @return an awaitable that yields either source result or fallback + * @since 6.0.0 + */ + public static <T> Awaitable<T> completeOnTimeoutMillis(Object source, T fallback, long timeoutMillis) { + return completeOnTimeout(source, fallback, timeoutMillis, TimeUnit.MILLISECONDS); + } + + /** + * Shared logic for {@link #orTimeout} and {@link #completeOnTimeout}: + * schedules the timeout action and wires up source completion to cancel + * the timer and propagate the result or error. */ private static <T> void scheduleTimeoutRace( CompletableFuture<T> sourceFuture, diff --git a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java new file mode 100644 index 0000000000..4d182219bc --- /dev/null +++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AsyncStream; +import groovy.concurrent.Awaitable; +import groovy.concurrent.AwaitableAdapter; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Bridges JDK {@link Flow.Publisher} instances into Groovy's + * {@link Awaitable}/{@link AsyncStream} world. + * + * <h2>Registration</h2> + * This adapter is auto-discovered by the + * {@link groovy.concurrent.AwaitableAdapterRegistry} via the + * {@code META-INF/services/groovy.concurrent.AwaitableAdapter} file. + * It handles any object that implements {@link Flow.Publisher}. + * + * <h2>Adaptation modes</h2> + * <ul> + * <li><b>Single-value</b> ({@code await publisher}): + * subscribes, takes the first {@code onNext} item, cancels the + * upstream subscription, and completes the returned {@link Awaitable}.</li> + * <li><b>Multi-value</b> ({@code for await (item in publisher)}): + * wraps the publisher into an {@link AsyncStream} backed by a + * bounded {@link LinkedBlockingQueue}, providing natural + * back-pressure by requesting one item at a time.</li> + * </ul> + * + * <h2>Thread safety</h2> + * <p>All subscriber callbacks ({@code onSubscribe}, {@code onNext}, + * {@code onError}, {@code onComplete}) are safe for invocation from + * any thread. Subscription references use {@link AtomicReference} + * for safe publication and race-free cancellation. The close path + * uses a CAS on an {@link AtomicBoolean} to guarantee exactly-once + * cleanup semantics.</p> + * + * <h2>Reactive Streams compliance</h2> + * <p>This adapter follows the Reactive Streams specification + * (JDK {@link Flow} variant) rules:</p> + * <ul> + * <li>§2.5 — duplicate {@code onSubscribe} cancels the second subscription</li> + * <li>§2.13 — {@code null} items in {@code onNext} are rejected immediately</li> + * <li>Terminal signals ({@code onError}/{@code onComplete}) use + * blocking {@code put()} to guarantee delivery even under queue + * contention</li> + * </ul> + * + * @see groovy.concurrent.AwaitableAdapterRegistry + * @see AsyncStream + * @since 6.0.0 + */ +public class FlowPublisherAdapter implements AwaitableAdapter { + + /** + * Queue capacity for the push→pull bridge in + * {@link #publisherToAsyncStream}. 256 provides a generous buffer + * for bursty publishers while bounding memory. + */ + private static final int QUEUE_CAPACITY = 256; + + /** + * Returns {@code true} if the given type is assignable to + * {@link Flow.Publisher}, enabling single-value {@code await}. + * + * @param type the source type to check + * @return {@code true} if this adapter can handle the type + */ + @Override + public boolean supportsAwaitable(Class<?> type) { + return Flow.Publisher.class.isAssignableFrom(type); + } + + /** + * Returns {@code true} if the given type is assignable to + * {@link Flow.Publisher}, enabling multi-value {@code for await}. + * + * @param type the source type to check + * @return {@code true} if this adapter can produce an async stream + */ + @Override + public boolean supportsAsyncStream(Class<?> type) { + return Flow.Publisher.class.isAssignableFrom(type); + } + + /** + * Converts a {@link Flow.Publisher} to a single-value {@link Awaitable} + * by subscribing and taking the first emitted item. + * + * @param source the publisher instance + * @param <T> the element type + * @return an awaitable that resolves to the first emitted value + */ + @Override + @SuppressWarnings("unchecked") + public <T> Awaitable<T> toAwaitable(Object source) { + return publisherToAwaitable((Flow.Publisher<T>) source); + } + + /** + * Converts a {@link Flow.Publisher} to a multi-value {@link AsyncStream} + * for use with {@code for await} loops. + * + * @param source the publisher instance + * @param <T> the element type + * @return an async stream that yields publisher items + */ + @Override + @SuppressWarnings("unchecked") + public <T> AsyncStream<T> toAsyncStream(Object source) { + return publisherToAsyncStream((Flow.Publisher<T>) source); + } + + // ---- Single-value adaptation (await publisher) ---- + + /** + * Subscribes to the publisher, takes the <em>first</em> emitted item, + * cancels the upstream subscription, and returns a completed + * {@link Awaitable}. + * + * <p>If the publisher completes without emitting any item, + * the returned awaitable resolves to {@code null}.</p> + * + * @param publisher the upstream publisher + * @param <T> the element type + * @return an awaitable that completes with the first emitted value + */ + private <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<T> publisher) { + CompletableFuture<T> cf = new CompletableFuture<>(); + // AtomicReference ensures safe publication of the subscription + // across the onSubscribe thread and callback threads (§1.3). + AtomicReference<Flow.Subscription> subRef = new AtomicReference<>(); + // Guard against non-compliant publishers that send multiple signals + AtomicBoolean done = new AtomicBoolean(false); + + publisher.subscribe(new Flow.Subscriber<T>() { + @Override + public void onSubscribe(Flow.Subscription s) { + // §2.5: reject duplicate subscriptions + if (!subRef.compareAndSet(null, s)) { + s.cancel(); + return; + } + s.request(1); + } + + @Override + public void onNext(T item) { + // §2.13: null items are spec violations + if (item == null) { + onError(new NullPointerException( + "Flow.Publisher onNext received null (Reactive Streams §2.13)")); + return; + } + if (done.compareAndSet(false, true)) { + cf.complete(item); + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + } + } + + @Override + public void onError(Throwable t) { + if (done.compareAndSet(false, true)) { + cf.completeExceptionally(t); + // Cancel subscription to release resources (idempotent per §3.7) + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + } + } + + @Override + public void onComplete() { + // Publisher completed before emitting — resolve to null + if (done.compareAndSet(false, true)) { + cf.complete(null); + } + } + }); + + return new GroovyPromise<>(cf); + } + + // ---- Multi-value adaptation (for await publisher) ---- + + // Signal wrapper types allow us to distinguish values, errors, and + // completion in a single queue without type confusion. + + private static final class ValueSignal<T> { + final T value; + ValueSignal(T value) { this.value = value; } + } + + private static final class ErrorSignal { + final Throwable error; + ErrorSignal(Throwable error) { this.error = error; } + } + + /** Singleton sentinel for stream completion. */ + private static final Object COMPLETE_SENTINEL = new Object(); + + /** + * Wraps a {@link Flow.Publisher} into an {@link AsyncStream}, + * providing a pull-based iteration interface over a push-based source. + * + * <p>Back-pressure is enforced by requesting exactly one item after + * each consumed element. The internal bounded queue (capacity + * {@value QUEUE_CAPACITY}) absorbs minor timing jitter between + * producer and consumer.</p> + * + * <p><b>Resource management:</b> When the consumer calls + * {@link AsyncStream#close()} (e.g. via {@code break} in a + * {@code for await} loop), the upstream subscription is cancelled + * and a completion sentinel is injected to unblock any pending + * {@code moveNext()} call.</p> + * + * @param publisher the upstream publisher + * @param <T> the element type + * @return an async stream that yields publisher items + */ + @SuppressWarnings("unchecked") + private <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> publisher) { + LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + AtomicReference<Flow.Subscription> subRef = new AtomicReference<>(); + // Tracks whether the stream has been closed (by consumer or terminal signal). + // CAS ensures exactly-once semantics for the close/cleanup path. + AtomicBoolean closedRef = new AtomicBoolean(false); + AtomicBoolean streamClosed = new AtomicBoolean(false); + + publisher.subscribe(new Flow.Subscriber<T>() { + @Override + public void onSubscribe(Flow.Subscription s) { + // §2.5: reject duplicate subscriptions + if (!subRef.compareAndSet(null, s)) { + s.cancel(); + return; + } + // Double-check pattern: if close() raced between the CAS and this point, + // the subscription must be cancelled immediately to avoid a dangling stream. + if (closedRef.get()) { + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + return; + } + s.request(1); + } + + @Override + public void onNext(T item) { + // §2.13: null items are spec violations + if (item == null) { + onError(new NullPointerException( + "Flow.Publisher onNext received null (Reactive Streams §2.13)")); + return; + } + if (!closedRef.get()) { + // Use offer() for value signals — if the queue is full (publisher + // misbehaving with respect to demand), the item is dropped rather + // than blocking the publisher thread indefinitely. + queue.offer(new ValueSignal<>(item)); + } + } + + @Override + public void onError(Throwable t) { + // Cancel subscription eagerly to release upstream resources + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + try { + // Terminal signals use blocking put() to guarantee delivery — + // the consumer MUST see the error to propagate it correctly. + queue.put(new ErrorSignal(t)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void onComplete() { + try { + // Blocking put() guarantees the consumer will see the sentinel, + // even if the queue was temporarily full from buffered values. + queue.put(COMPLETE_SENTINEL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + }); + + return new AsyncStream<T>() { + private T current; + + @Override + public Awaitable<Boolean> moveNext() { + // Fast path: stream already closed — no CF allocation needed + if (streamClosed.get()) { + return Awaitable.of(Boolean.FALSE); + } + + CompletableFuture<Boolean> cf = new CompletableFuture<>(); + try { + Object signal = queue.take(); + + if (signal instanceof ValueSignal) { + current = ((ValueSignal<T>) signal).value; + cf.complete(Boolean.TRUE); + // Request the next item — back-pressure: one-at-a-time + Flow.Subscription sub = subRef.get(); + if (sub != null) sub.request(1); + } else if (signal instanceof ErrorSignal) { + streamClosed.set(true); + cf.completeExceptionally(((ErrorSignal) signal).error); + } else { + // COMPLETE_SENTINEL or unknown — treat as end-of-stream + streamClosed.set(true); + cf.complete(Boolean.FALSE); + } + } catch (InterruptedException ie) { + // Consumer thread was interrupted — throw directly as + // CancellationException (matching AsyncStreamGenerator behaviour + // and avoiding JDK 23+ CompletableFuture.get() wrapping issues) + streamClosed.set(true); + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted during moveNext"); + ce.initCause(ie); + throw ce; + } + + return new GroovyPromise<>(cf); + } + + @Override + public T getCurrent() { + return current; + } + + @Override + public void close() { + if (streamClosed.compareAndSet(false, true)) { + closedRef.set(true); + // Cancel the upstream subscription + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + // Drain the queue and inject a sentinel to unblock a + // concurrent moveNext() that may be blocked in take(). + // Using put() after clear() guarantees delivery because + // the queue was just emptied and has capacity. + queue.clear(); + queue.offer(COMPLETE_SENTINEL); + } + } + }; + } +} diff --git a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java index e39280af9e..5edd5f9e15 100644 --- a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java +++ b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java @@ -103,7 +103,9 @@ public class AsyncASTTransformation extends AbstractASTTransformation { ClassNode originalReturnType = mNode.getReturnType(); if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName()) || ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName()) - || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) { + || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName()) + || "java.util.concurrent.CompletionStage".equals(originalReturnType.getName()) + || "java.util.concurrent.Future".equals(originalReturnType.getName())) { addError(MY_TYPE_NAME + " cannot be applied to a method that already returns an async type", mNode); return; } diff --git a/src/resources/META-INF/services/groovy.concurrent.AwaitableAdapter b/src/resources/META-INF/services/groovy.concurrent.AwaitableAdapter new file mode 100644 index 0000000000..a591ba6a55 --- /dev/null +++ b/src/resources/META-INF/services/groovy.concurrent.AwaitableAdapter @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.groovy.runtime.async.FlowPublisherAdapter diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index a491c347bf..cb9c88ee2c 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -39,7 +39,8 @@ Key capabilities include: * **`for await`** — iterate over asynchronous data sources * **`yield return`** — produce asynchronous streams (generators) * **`defer`** — schedule Go-style cleanup actions that run when the method completes -* **Framework integration** — built-in adapters for `CompletableFuture`, `Future`, and `Flow.Publisher`; +* **Framework integration** — built-in adapters for `CompletableFuture` and `Future`; +`Flow.Publisher` support via the auto-discovered `FlowPublisherAdapter` (an internal runtime adapter); extensible to RxJava, Reactor, and Spring via the adapter registry On JDK 21+, async methods automatically leverage @@ -123,7 +124,7 @@ the developer to manage queues, signals, or synchronization. Groovy does not force developers to abandon their existing async investments. The `await` keyword natively understands `CompletableFuture`, `CompletionStage`, -`Future`, and `Flow.Publisher`: +`Future`, and `Flow.Publisher` (via the auto-discovered `FlowPublisherAdapter` runtime adapter): [source,groovy] ---- @@ -574,11 +575,13 @@ Create an awaitable timer (analogous to JavaScript's `setTimeout` wrapped in a ` include::../test/AsyncAwaitSpecTest.groovy[tags=delay_example,indent=0] ---- -=== `timeout` / `timeoutOr` — Non-Blocking Deadlines +=== `orTimeout` / `completeOnTimeout` — Non-Blocking Deadlines -Apply a deadline without blocking the calling thread. `Awaitable.timeout(...)` -fails with `TimeoutException`, while `Awaitable.timeoutOr(...)` completes with a -fallback value. The instance forms `orTimeout(...)` and `completeOnTimeout(...)` +Apply a deadline without blocking the calling thread. `Awaitable.orTimeoutMillis(...)` +(or `Awaitable.orTimeout(source, duration, unit)` for explicit units) +fails with `TimeoutException`, while `Awaitable.completeOnTimeoutMillis(...)` +(or `Awaitable.completeOnTimeout(source, fallback, duration, unit)`) completes with a +fallback value. The instance forms `orTimeoutMillis(...)` and `completeOnTimeoutMillis(...)` provide the same capability on an existing awaitable. [source,groovy] @@ -586,10 +589,10 @@ provide the same capability on an existing awaitable. include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_combinators,indent=0] ---- -==== Instance Forms: `orTimeout` and `completeOnTimeout` +==== Instance Forms: `orTimeoutMillis` and `completeOnTimeoutMillis` -The instance methods `orTimeout(millis)` and `completeOnTimeout(fallback, millis)` provide -a fluent alternative to the static `Awaitable.timeout()` and `Awaitable.timeoutOr()` forms: +The instance methods `orTimeoutMillis(millis)` and `completeOnTimeoutMillis(fallback, millis)` provide +a fluent alternative to the static `Awaitable.orTimeoutMillis()` and `Awaitable.completeOnTimeoutMillis()` forms: [source,groovy] ---- @@ -636,12 +639,18 @@ This is conceptually similar to JavaScript async iterators' `return()`, C#'s == Adapter Registry The `groovy.concurrent.AwaitableAdapterRegistry` allows extending `await` to support additional -asynchronous types from third-party frameworks. Built-in adapters handle: +asynchronous types from third-party frameworks. The built-in adapter handles: * `groovy.concurrent.Awaitable` (native Groovy promise — passthrough) * `java.util.concurrent.CompletableFuture` and `CompletionStage` * `java.util.concurrent.Future` (blocking, delegated to a configurable executor) -* `java.util.concurrent.Flow.Publisher` (single-value `await` and multi-value `for await`) + +Additionally, `java.util.concurrent.Flow.Publisher` support (single-value `await` and +multi-value `for await`) is provided by `FlowPublisherAdapter` +(`org.apache.groovy.runtime.async`), which is auto-discovered via `ServiceLoader`. +This decoupled design allows applications to override the default +reactive-streams bridge with a framework-specific adapter (e.g., one backed by Reactor's +native scheduler) by registering a higher-priority adapter. === Registering a Custom Adapter @@ -734,7 +743,7 @@ include::../test/AsyncAwaitSpecTest.groovy[tags=retry_pattern,indent=0] === Timeout -Use `Awaitable.timeout()` (or the instance form `orTimeout()`) to apply a +Use `Awaitable.orTimeoutMillis()` (or the instance form `orTimeoutMillis()`) to apply a deadline without dropping back to manual race logic: [source,groovy] @@ -949,7 +958,8 @@ writes (adapter registration) are rare, reads (every `await`) are frequent * `AsyncStreamGenerator.current` — `volatile` field for cross-thread producer/consumer visibility * `AsyncStreamGenerator` close state — `AtomicBoolean` + `AtomicReference<Thread>` for prompt, idempotent close/cancellation signalling -* `Flow.Publisher` adaptation — `AtomicReference<Subscription>` plus close-aware queue signalling +* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — `AtomicReference<Subscription>` with CAS-guarded + onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value path, and close-aware queue signalling * Defer scopes — per-task `ArrayDeque`, confined to a single thread (no sharing) * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations @@ -1100,7 +1110,7 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | `Task.sleep(for:)` | **Timeout** -| `Awaitable.timeout(task, ms)` / `task.orTimeout(ms)` +| `Awaitable.orTimeoutMillis(task, ms)` / `task.orTimeoutMillis(ms)` | `Promise.race([...])` / `AbortSignal.timeout()` | `task.WaitAsync(timeout)` / `CancelAfter` | `withTimeout(ms) { ... }` @@ -1192,7 +1202,7 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | `awaitable.isDone()` / `isCancelled()` / `isCompletedExceptionally()` | Timeout (instance) -| `awaitable.orTimeout(ms)` / `awaitable.completeOnTimeout(fallback, ms)` +| `awaitable.orTimeoutMillis(ms)` / `awaitable.completeOnTimeoutMillis(fallback, ms)` | AwaitResult | `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }` diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy b/src/spec/test/AsyncAwaitSpecTest.groovy index eb5ac71e13..fd3636ec80 100644 --- a/src/spec/test/AsyncAwaitSpecTest.groovy +++ b/src/spec/test/AsyncAwaitSpecTest.groovy @@ -677,13 +677,13 @@ async slowCall() { } try { - await(Awaitable.timeout(slowCall(), 50)) + await(Awaitable.orTimeoutMillis(slowCall(), 50)) assert false : "should have timed out" } catch (TimeoutException e) { assert e.message.contains("Timed out after 50") } -assert await(Awaitable.timeoutOr(slowCall(), "cached", 50)) == "cached" +assert await(Awaitable.completeOnTimeoutMillis(slowCall(), "cached", 50)) == "cached" // end::timeout_combinators[] ''' } @@ -874,7 +874,7 @@ async longRunningTask() { } try { - await Awaitable.timeout(longRunningTask(), 50) + await Awaitable.orTimeoutMillis(longRunningTask(), 50) assert false : "should have timed out" } catch (TimeoutException e) { assert e.message.contains("Timed out after 50") @@ -1164,14 +1164,14 @@ async slowTask() { // orTimeout: fails with TimeoutException if not completed in time try { - await(slowTask().orTimeout(50)) + await(slowTask().orTimeoutMillis(50)) assert false : "should have timed out" } catch (TimeoutException e) { assert e.message.contains("50") } // completeOnTimeout: completes with a fallback value instead of failing -assert await(slowTask().completeOnTimeout("fallback", 50)) == "fallback" +assert await(slowTask().completeOnTimeoutMillis("fallback", 50)) == "fallback" // end::instance_timeout_methods[] ''' } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy index 7df7d1a152..34886a564d 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy @@ -1818,7 +1818,7 @@ class AsyncApiTest { void testAwaitableOrTimeoutInstanceMethod() { def cf = new CompletableFuture<>() def awaitable = GroovyPromise.of(cf) - def withTimeout = awaitable.orTimeout(50) + def withTimeout = awaitable.orTimeoutMillis(50) def ex = shouldFail(ExecutionException) { withTimeout.get() } @@ -1840,7 +1840,7 @@ class AsyncApiTest { void testAwaitableCompleteOnTimeoutInstanceMethod() { def cf = new CompletableFuture<>() def awaitable = GroovyPromise.of(cf) - def withFallback = awaitable.completeOnTimeout('default', 50) + def withFallback = awaitable.completeOnTimeoutMillis('default', 50) assert withFallback.get() == 'default' } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy index 1206ae405c..58be5d29c6 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy @@ -311,4 +311,230 @@ class AsyncConcurrencyTest { assert !consumer.isAlive() : "Iteration $iteration: Consumer deadlocked" } } + + // ================================================================= + // moveNext() branch coverage tests + // ================================================================= + + /** + * Branch: same thread re-entry. + * + * The CAS guard in moveNext() allows the same thread to call moveNext() + * multiple times sequentially (the expected usage pattern). This test + * explicitly verifies that no ISE is thrown when the same thread drives + * multiple iterations. + */ + @Test + @Timeout(5) + @DisplayName("Same thread can call moveNext() sequentially without ISE") + void testSameThreadSequentialMoveNext() { + def gen = new AsyncStreamGenerator<Integer>() + Thread.start { + gen.yield(1) + gen.yield(2) + gen.yield(3) + gen.complete() + } + + def items = [] + // All three moveNext() + getCurrent() calls happen on the SAME thread. + while (gen.moveNext().get()) { + items << gen.getCurrent() + } + assert items == [1, 2, 3] + } + + /** + * Branch: InterruptedException caught while closed flag is true. + * + * When moveNext() is blocked in queue.take() and close() fires (setting + * closed=true and interrupting the consumer), the InterruptedException + * handler checks closed.get(). If true, it returns Awaitable.of(false) + * instead of throwing CancellationException. + * + * This test deterministically triggers this branch by: + * 1. Starting moveNext() which blocks on the empty queue + * 2. Calling close() from another thread which sets closed=true and + * interrupts the consumer + * 3. Verifying that moveNext() returns false (not an exception) + */ + @Test + @Timeout(5) + @DisplayName("moveNext() returns false when interrupted after close()") + void testMoveNextInterruptedWhileClosed() { + def gen = new AsyncStreamGenerator<Integer>() + def moveNextResult = new AtomicReference<Boolean>() + def moveNextError = new AtomicReference<Throwable>() + def consumerStarted = new CountDownLatch(1) + + def consumer = Thread.start { + try { + consumerStarted.countDown() + // This will block because no producer is yielding anything + def result = gen.moveNext().get() + moveNextResult.set(result) + } catch (Throwable t) { + moveNextError.set(t) + } + } + + // Wait for consumer to start blocking in moveNext() + consumerStarted.await(2, TimeUnit.SECONDS) + Thread.sleep(50) // Brief pause to ensure queue.take() is entered + + // close() sets closed=true and interrupts the consumer thread + gen.close() + + consumer.join(3000) + assert !consumer.isAlive() : "Consumer should have exited" + + // The IE handler should detect closed=true and return false + assert moveNextResult.get() == false : "Expected false from moveNext() after close()" + assert moveNextError.get() == null : "Expected no exception, got: ${moveNextError.get()}" + } + + /** + * Branch: double-check after CAS detects close() race. + * + * If close() fires between the initial closed.get() check and the + * consumerThread CAS, the consumer reference is not yet visible to close(). + * The double-check after CAS detects this and returns false immediately + * without entering queue.take(). + * + * This test uses a CyclicBarrier to synchronize close() and moveNext() + * on the same timing boundary. We run multiple iterations because the + * exact race window is non-deterministic. + */ + @Test + @Timeout(10) + @DisplayName("Double-check after CAS detects close() race") + void testDoubleCheckAfterCasDetectsCloseRace() { + int detected = 0 + for (int i = 0; i < 200; i++) { + def gen = new AsyncStreamGenerator<Integer>() + def barrier = new CyclicBarrier(2) + def result = new AtomicReference<Object>() + + def consumer = Thread.start { + try { + barrier.await(2, TimeUnit.SECONDS) + def r = gen.moveNext().get() + result.set(r) + } catch (Throwable t) { + result.set(t) + } + } + + def closer = Thread.start { + try { + barrier.await(2, TimeUnit.SECONDS) + gen.close() + } catch (Throwable t) { + // ignore + } + } + + consumer.join(3000) + closer.join(3000) + + def r = result.get() + // Either: + // 1. moveNext() returns false (caught by initial check OR double-check) + // 2. moveNext() throws CancellationException (interrupted in take()) + // Both are valid outcomes of the race — but it must NEVER hang. + if (r == false || r == Boolean.FALSE) { + detected++ + } + } + // At least some iterations should hit the false-return path + assert detected > 0 : "Expected at least one iteration where moveNext() returned false" + } + + /** + * Branch: moveNext() on already-closed generator returns false immediately. + * + * After close() is called, all subsequent moveNext() calls should return + * Awaitable.of(false) without blocking. + */ + @Test + @Timeout(5) + @DisplayName("moveNext() after close returns false without blocking") + void testMoveNextAfterCloseReturnsFalse() { + def gen = new AsyncStreamGenerator<Integer>() + gen.close() + + // Should return false immediately (no producer needed) + assert gen.moveNext().get() == false + assert gen.moveNext().get() == false + assert gen.moveNext().get() == false + } + + /** + * Branch: ErrorItem with Error subclass (thrown directly, not via sneakyThrow). + * + * When the producer yields an Error (not Exception), moveNext() should + * throw it directly as an Error, not wrap it. + */ + @Test + @Timeout(5) + @DisplayName("moveNext() propagates Error subclass directly") + void testMoveNextPropagatesErrorDirectly() { + def gen = new AsyncStreamGenerator<Integer>() + Thread.start { + gen.error(new StackOverflowError("test error")) + } + + try { + gen.moveNext().get() + assert false : "Expected StackOverflowError" + } catch (StackOverflowError e) { + assert e.message == "test error" + } + } + + /** + * Branch: ErrorItem with Exception (thrown via sneakyThrow). + * + * When the producer yields a checked Exception, moveNext() should throw + * it via sneakyThrow (bypassing checked exception compiler enforcement). + */ + @Test + @Timeout(5) + @DisplayName("moveNext() propagates Exception via sneakyThrow") + void testMoveNextPropagatesExceptionViaSneakyThrow() { + def gen = new AsyncStreamGenerator<Integer>() + Thread.start { + gen.error(new java.io.IOException("disk fail")) + } + + try { + gen.moveNext().get() + assert false : "Expected IOException" + } catch (java.io.IOException e) { + assert e.message == "disk fail" + } + } + + /** + * Branch: DONE sentinel marks stream exhausted. + * + * After the producer calls complete(), the next moveNext() should + * receive the DONE sentinel and return false. + */ + @Test + @Timeout(5) + @DisplayName("moveNext() returns false on DONE sentinel after complete()") + void testMoveNextReturnsFalseOnDoneSentinel() { + def gen = new AsyncStreamGenerator<Integer>() + Thread.start { + gen.yield(42) + gen.complete() + } + + assert gen.moveNext().get() == true + assert gen.getCurrent() == 42 + assert gen.moveNext().get() == false + // Subsequent calls should also return false (closed state) + assert gen.moveNext().get() == false + } } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy index 5a0737f9cc..1196af9510 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy @@ -1063,13 +1063,13 @@ class AsyncPatternsTest { } try { - await(Awaitable.timeout(new CompletableFuture<String>(), 50, TimeUnit.MILLISECONDS)) + await(Awaitable.orTimeout(new CompletableFuture<String>(), 50, TimeUnit.MILLISECONDS)) assert false : "Should have timed out" } catch (TimeoutException e) { assert e.message.contains("Timed out after 50") } - def fallback = await(Awaitable.timeoutOr( + def fallback = await(Awaitable.completeOnTimeout( Awaitable.delay(10_000).then { "late" }, "cached", 50,
