This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git

commit a0115c503fbbb0dd109421a08e0f56f9f5b697e4
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 02:25:27 2026 +0900

    Minor tweaks
---
 src/main/java/groovy/concurrent/Awaitable.java     |  82 ++---
 .../concurrent/AwaitableAdapterRegistry.java       | 208 +-----------
 .../groovy/parser/antlr4/ModifierManager.java      |  19 ++
 .../groovy/runtime/async/AsyncStreamGenerator.java |  24 +-
 .../apache/groovy/runtime/async/AsyncSupport.java  |  86 +++--
 .../groovy/runtime/async/FlowPublisherAdapter.java | 377 +++++++++++++++++++++
 .../groovy/transform/AsyncASTTransformation.java   |   4 +-
 .../services/groovy.concurrent.AwaitableAdapter    |  16 +
 src/spec/doc/core-async-await.adoc                 |  40 ++-
 src/spec/test/AsyncAwaitSpecTest.groovy            |  10 +-
 .../codehaus/groovy/transform/AsyncApiTest.groovy  |   4 +-
 .../groovy/transform/AsyncConcurrencyTest.groovy   | 226 ++++++++++++
 .../groovy/transform/AsyncPatternsTest.groovy      |   4 +-
 13 files changed, 804 insertions(+), 296 deletions(-)

diff --git a/src/main/java/groovy/concurrent/Awaitable.java 
b/src/main/java/groovy/concurrent/Awaitable.java
index b64d556d87..ce41b7f947 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -233,20 +233,18 @@ public interface Awaitable<T> {
 
     /**
      * Returns a new {@code Awaitable} that fails with {@link TimeoutException}
-     * if this computation does not complete within the specified duration.
+     * if this computation does not complete within the specified milliseconds.
      * <p>
      * Unlike {@link #get(long, TimeUnit)}, this is a non-blocking, composable
      * timeout combinator: it returns another {@code Awaitable} that can itself
      * be awaited, chained, or passed to {@link #all(Object...)} / {@link 
#any(Object...)}.
-     * This plays a role similar to Kotlin's {@code withTimeout} while
-     * preserving Groovy's awaitable abstraction.
      *
-     * @param duration the timeout duration in milliseconds
+     * @param timeoutMillis the timeout duration in milliseconds
      * @return a new awaitable with timeout semantics
      * @since 6.0.0
      */
-    default Awaitable<T> orTimeout(long duration) {
-        return Awaitable.timeout(this, duration, TimeUnit.MILLISECONDS);
+    default Awaitable<T> orTimeoutMillis(long timeoutMillis) {
+        return Awaitable.orTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -259,7 +257,7 @@ public interface Awaitable<T> {
      * @since 6.0.0
      */
     default Awaitable<T> orTimeout(long duration, TimeUnit unit) {
-        return Awaitable.timeout(this, duration, unit);
+        return Awaitable.orTimeout(this, duration, unit);
     }
 
     /**
@@ -267,12 +265,12 @@ public interface Awaitable<T> {
      * value if this computation does not finish before the timeout expires.
      *
      * @param fallback the value to use when the timeout expires
-     * @param duration the timeout duration in milliseconds
+     * @param timeoutMillis the timeout duration in milliseconds
      * @return a new awaitable that yields either the original result or the 
fallback
      * @since 6.0.0
      */
-    default Awaitable<T> completeOnTimeout(T fallback, long duration) {
-        return Awaitable.timeoutOr(this, fallback, duration, 
TimeUnit.MILLISECONDS);
+    default Awaitable<T> completeOnTimeoutMillis(T fallback, long 
timeoutMillis) {
+        return Awaitable.completeOnTimeout(this, fallback, timeoutMillis, 
TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -286,7 +284,7 @@ public interface Awaitable<T> {
      * @since 6.0.0
      */
     default Awaitable<T> completeOnTimeout(T fallback, long duration, TimeUnit 
unit) {
-        return Awaitable.timeoutOr(this, fallback, duration, unit);
+        return Awaitable.completeOnTimeout(this, fallback, duration, unit);
     }
 
     /**
@@ -406,9 +404,13 @@ public interface Awaitable<T> {
         return AsyncSupport.delay(duration, unit);
     }
 
+    // ---- Timeout combinators ----
+
     /**
      * Adapts the given source to an {@code Awaitable} and applies a 
non-blocking
-     * timeout to it.
+     * fail-fast timeout.  Returns a new awaitable that fails with
+     * {@link TimeoutException} if the source does not complete before the
+     * deadline elapses.
      * <p>
      * The source may be a Groovy {@link Awaitable}, a JDK
      * {@link CompletableFuture}/{@link java.util.concurrent.CompletionStage},
@@ -416,58 +418,60 @@ public interface Awaitable<T> {
      * a concise timeout combinator analogous to Kotlin's {@code withTimeout},
      * but as a value-level operation that returns another awaitable.
      *
-     * @param source the async source to time out
-     * @param duration the timeout duration in milliseconds
-     * @return a new awaitable that fails with {@link TimeoutException} on 
timeout
+     * @param source        the async source to time out
+     * @param timeoutMillis the timeout duration in milliseconds
+     * @param <T>           the result type
+     * @return a new awaitable with timeout semantics
      * @since 6.0.0
      */
-    static <T> Awaitable<T> timeout(Object source, long duration) {
-        return AsyncSupport.timeout(source, duration, TimeUnit.MILLISECONDS);
+    static <T> Awaitable<T> orTimeoutMillis(Object source, long timeoutMillis) 
{
+        return AsyncSupport.orTimeout(source, timeoutMillis, 
TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Adapts the given source to an {@code Awaitable} and applies a 
non-blocking
-     * timeout to it.
+     * Adapts the given source and applies a non-blocking fail-fast timeout
+     * with explicit {@link TimeUnit}.
      *
-     * @param source the async source to time out
+     * @param source   the async source to time out
      * @param duration the timeout duration
-     * @param unit the time unit
-     * @return a new awaitable that fails with {@link TimeoutException} on 
timeout
+     * @param unit     the time unit
+     * @param <T>      the result type
+     * @return a new awaitable with timeout semantics
      * @since 6.0.0
      */
-    static <T> Awaitable<T> timeout(Object source, long duration, TimeUnit 
unit) {
-        return AsyncSupport.timeout(source, duration, unit);
+    static <T> Awaitable<T> orTimeout(Object source, long duration, TimeUnit 
unit) {
+        return AsyncSupport.orTimeout(source, duration, unit);
     }
 
     /**
-     * Adapts the given source to an {@code Awaitable} and returns a new
-     * awaitable that yields the supplied fallback value if the timeout expires
-     * first.
+     * Adapts the given source and returns a new awaitable that yields the
+     * supplied fallback value if the timeout expires first.
      *
-     * @param source the async source to wait for
-     * @param fallback the fallback value to use on timeout
-     * @param duration the timeout duration in milliseconds
+     * @param source        the async source to wait for
+     * @param fallback      the fallback value to use on timeout
+     * @param timeoutMillis the timeout duration in milliseconds
+     * @param <T>           the result type
      * @return a new awaitable yielding either the original result or the 
fallback
      * @since 6.0.0
      */
-    static <T> Awaitable<T> timeoutOr(Object source, T fallback, long 
duration) {
-        return AsyncSupport.timeoutOr(source, fallback, duration, 
TimeUnit.MILLISECONDS);
+    static <T> Awaitable<T> completeOnTimeoutMillis(Object source, T fallback, 
long timeoutMillis) {
+        return AsyncSupport.completeOnTimeout(source, fallback, timeoutMillis, 
TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Adapts the given source to an {@code Awaitable} and returns a new
-     * awaitable that yields the supplied fallback value if the timeout expires
-     * first.
+     * Adapts the given source and returns a new awaitable that yields the
+     * supplied fallback value if the timeout expires first.
      *
-     * @param source the async source to wait for
+     * @param source   the async source to wait for
      * @param fallback the fallback value to use on timeout
      * @param duration the timeout duration
-     * @param unit the time unit
+     * @param unit     the time unit
+     * @param <T>      the result type
      * @return a new awaitable yielding either the original result or the 
fallback
      * @since 6.0.0
      */
-    static <T> Awaitable<T> timeoutOr(Object source, T fallback, long 
duration, TimeUnit unit) {
-        return AsyncSupport.timeoutOr(source, fallback, duration, unit);
+    static <T> Awaitable<T> completeOnTimeout(Object source, T fallback, long 
duration, TimeUnit unit) {
+        return AsyncSupport.completeOnTimeout(source, fallback, duration, 
unit);
     }
 
     // ---- Executor configuration ----
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java 
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 02fe9b7325..a3500a13d3 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -23,18 +23,13 @@ import org.apache.groovy.runtime.async.GroovyPromise;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ServiceLoader;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Flow;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Central registry for {@link AwaitableAdapter} instances.
@@ -45,12 +40,14 @@ import java.util.concurrent.atomic.AtomicReference;
  * <ul>
  *   <li>{@link CompletableFuture} and {@link CompletionStage}</li>
  *   <li>{@link Future} (adapted via a blocking wrapper)</li>
- *   <li>JDK {@link Flow.Publisher} — single-value
- *       ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
- *       with backpressure support and upstream cancellation when the
- *       resulting {@link AsyncStream} is {@linkplain AsyncStream#close() 
closed}</li>
  * </ul>
  * <p>
+ * JDK {@link java.util.concurrent.Flow.Publisher} support is provided by
+ * the separately registered
+ * {@link org.apache.groovy.runtime.async.FlowPublisherAdapter}, which is
+ * auto-discovered via ServiceLoader.  Third-party frameworks (Reactor,
+ * RxJava, etc.) may register their own higher-priority adapters.
+ * <p>
  * Additional adapters can be registered at runtime via {@link #register}.
  *
  * @see AwaitableAdapter
@@ -163,25 +160,18 @@ public class AwaitableAdapterRegistry {
 
     /**
      * Built-in adapter handling JDK {@link CompletableFuture}, {@link 
CompletionStage},
-     * {@link Future}, {@link Flow.Publisher},
-     * and {@link Iterable}/{@link Iterator} (for async stream bridging).
+     * {@link Future}, and {@link Iterable}/{@link Iterator} (for async stream 
bridging).
      * <p>
      * {@link CompletionStage} support enables seamless integration with 
frameworks
      * that return {@code CompletionStage} (e.g., Spring's async APIs, 
Reactor's
      * {@code Mono.toFuture()}, etc.) without any additional adapter 
registration.
-     * <p>
-     * {@link Flow.Publisher} support enables seamless
-     * consumption of reactive streams via {@code for await} without any 
adapter
-     * registration.  This covers any reactive library that implements the JDK
-     * standard reactive-streams interface (Reactor, RxJava via adapters, 
etc.).
      */
     private static class BuiltInAdapter implements AwaitableAdapter {
 
         @Override
         public boolean supportsAwaitable(Class<?> type) {
             return CompletionStage.class.isAssignableFrom(type)
-                    || Future.class.isAssignableFrom(type)
-                    || Flow.Publisher.class.isAssignableFrom(type);
+                    || Future.class.isAssignableFrom(type);
         }
 
         @Override
@@ -190,9 +180,6 @@ public class AwaitableAdapterRegistry {
             if (source instanceof CompletionStage) {
                 return new GroovyPromise<>(((CompletionStage<T>) 
source).toCompletableFuture());
             }
-            if (source instanceof Flow.Publisher<?> pub) {
-                return publisherToAwaitable(pub);
-            }
             if (source instanceof Future) {
                 Future<T> future = (Future<T>) source;
                 CompletableFuture<T> cf = new CompletableFuture<>();
@@ -214,16 +201,12 @@ public class AwaitableAdapterRegistry {
         @Override
         public boolean supportsAsyncStream(Class<?> type) {
             return Iterable.class.isAssignableFrom(type)
-                    || Iterator.class.isAssignableFrom(type)
-                    || Flow.Publisher.class.isAssignableFrom(type);
+                    || Iterator.class.isAssignableFrom(type);
         }
 
         @Override
         @SuppressWarnings("unchecked")
         public <T> AsyncStream<T> toAsyncStream(Object source) {
-            if (source instanceof Flow.Publisher<?> pub) {
-                return publisherToAsyncStream((Flow.Publisher<T>) pub);
-            }
             final Iterator<T> iterator;
             if (source instanceof Iterable) {
                 iterator = ((Iterable<T>) source).iterator();
@@ -249,177 +232,6 @@ public class AwaitableAdapterRegistry {
             };
         }
 
-        // Signal wrappers for the publisher-to-async-stream queue, ensuring
-        // that values, errors, and completion are never confused — even when
-        // the element type T extends Throwable.
-        private static final Object COMPLETE_SENTINEL = new Object();
-        private record ValueSignal<T>(T value) { }
-        private record ErrorSignal(Throwable error) { }
-
-        /**
-         * Adapts a {@link Flow.Publisher} to an {@link AsyncStream} using a
-         * bounded blocking queue to bridge the push-based reactive-streams
-         * protocol to the pull-based {@code moveNext}/{@code getCurrent}
-         * pattern.  Backpressure is managed by requesting one item at a time:
-         * each {@code moveNext()} call requests the next item from the 
upstream
-         * subscription only after the previous item has been consumed.
-         * <p>
-         * All queue entries are wrapped in typed signal objects
-         * ({@code ValueSignal}, {@code ErrorSignal}, or a completion sentinel)
-         * so that element types extending {@link Throwable} are never
-         * misidentified as error signals.
-         * <p>
-         * Thread interruption during {@code queue.take()} is converted to a
-         * {@link CancellationException} that is <em>thrown directly</em> 
rather
-         * than stored in a {@code CompletableFuture}.  On JDK 23+,
-         * {@code CompletableFuture.get()} wraps stored
-         * {@code CancellationException}s in a new instance with message
-         * {@code "get"}, discarding the original message and cause chain.
-         * Throwing directly bypasses {@code CF.get()} entirely, ensuring
-         * deterministic behaviour across all JDK versions (17 – 25+).
-         * The interrupt flag is restored per Java convention, and the original
-         * {@code InterruptedException} is preserved as the
-         * {@linkplain Throwable#getCause() cause}.  This matches the pattern
-         * used by {@link 
org.apache.groovy.runtime.async.AsyncStreamGenerator#moveNext()}.
-         */
-        @SuppressWarnings("unchecked")
-        private static <T> AsyncStream<T> 
publisherToAsyncStream(Flow.Publisher<T> publisher) {
-            BlockingQueue<Object> queue = new LinkedBlockingQueue<>(256);
-            AtomicReference<Flow.Subscription> subRef = new 
AtomicReference<>();
-            AtomicBoolean closedRef = new AtomicBoolean(false);
-
-            publisher.subscribe(new Flow.Subscriber<T>() {
-                @Override
-                public void onSubscribe(Flow.Subscription s) {
-                    if (!closedRef.get()) {
-                        subRef.set(s);
-                        s.request(1);
-                    } else {
-                        s.cancel();
-                    }
-                }
-
-                @Override
-                public void onNext(T item) {
-                    if (!closedRef.get()) {
-                        queue.offer(new ValueSignal<>(item));
-                    }
-                }
-
-                @Override
-                public void onError(Throwable t) {
-                    if (!closedRef.get()) {
-                        queue.offer(new ErrorSignal(t));
-                    }
-                }
-
-                @Override
-                public void onComplete() {
-                    if (!closedRef.get()) {
-                        queue.offer(COMPLETE_SENTINEL);
-                    }
-                }
-            });
-
-            return new AsyncStream<T>() {
-                private volatile T current;
-                private final AtomicBoolean streamClosed = new 
AtomicBoolean(false);
-
-                @Override
-                public Awaitable<Boolean> moveNext() {
-                    if (streamClosed.get()) {
-                        return Awaitable.of(false);
-                    }
-                    CompletableFuture<Boolean> cf = new CompletableFuture<>();
-                    try {
-                        Object signal = queue.take();
-                        if (signal == COMPLETE_SENTINEL) {
-                            streamClosed.set(true);
-                            cf.complete(false);
-                        } else if (signal instanceof ErrorSignal es) {
-                            streamClosed.set(true);
-                            cf.completeExceptionally(es.error());
-                        } else if (signal instanceof ValueSignal<?> vs) {
-                            current = (T) vs.value();
-                            cf.complete(true);
-                            Flow.Subscription sub = subRef.get();
-                            if (sub != null) sub.request(1);
-                        }
-                    } catch (InterruptedException e) {
-                        if (streamClosed.get()) {
-                            return Awaitable.of(false);
-                        }
-                        // Throw directly instead of storing in the 
CompletableFuture.
-                        // On JDK 23+, CF.get() wraps stored 
CancellationExceptions in a
-                        // new CancellationException("get"), discarding our 
message and
-                        // cause chain.  Throwing directly avoids CF.get() 
entirely and
-                        // ensures deterministic behaviour across all JDK 
versions.
-                        // This matches the pattern used by 
AsyncStreamGenerator.moveNext().
-                        Thread.currentThread().interrupt();
-                        CancellationException ce = new 
CancellationException("Interrupted while waiting for next item");
-                        ce.initCause(e);
-                        throw ce;
-                    }
-                    return new GroovyPromise<>(cf);
-                }
-
-                @Override
-                public T getCurrent() {
-                    return current;
-                }
-
-                @Override
-                public void close() {
-                    if (!streamClosed.compareAndSet(false, true)) {
-                        return;
-                    }
-                    closedRef.set(true);
-                    Flow.Subscription subscription = subRef.getAndSet(null);
-                    if (subscription != null) {
-                        subscription.cancel();
-                    }
-                    queue.clear();
-                    queue.offer(COMPLETE_SENTINEL);
-                }
-            };
-        }
-
-        /**
-         * Adapts a single-value {@link Flow.Publisher} to
-         * an {@link Awaitable}.  Subscribes and takes the first emitted value.
-         */
-        @SuppressWarnings("unchecked")
-        private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<?> 
publisher) {
-            CompletableFuture<T> cf = new CompletableFuture<>();
-            publisher.subscribe(new Flow.Subscriber<Object>() {
-                private Flow.Subscription subscription;
-
-                @Override
-                public void onSubscribe(Flow.Subscription s) {
-                    this.subscription = s;
-                    s.request(1);
-                }
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void onNext(Object item) {
-                    cf.complete((T) item);
-                    subscription.cancel();
-                }
-
-                @Override
-                public void onError(Throwable t) {
-                    cf.completeExceptionally(t);
-                }
-
-                @Override
-                public void onComplete() {
-                    if (!cf.isDone()) cf.complete(null);
-                }
-            });
-            return new GroovyPromise<>(cf);
-        }
-
         private static <T> void completeFrom(CompletableFuture<T> cf, 
Future<T> future) {
             try {
                 cf.complete(future.get());
@@ -430,6 +242,8 @@ public class AwaitableAdapterRegistry {
                 cf.completeExceptionally(ce);
             } catch (ExecutionException e) {
                 cf.completeExceptionally(e.getCause() != null ? e.getCause() : 
e);
+            } catch (CancellationException e) {
+                cf.completeExceptionally(e);
             } catch (Exception e) {
                 cf.completeExceptionally(e);
             }
diff --git a/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java 
b/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java
index 729eb14cf2..b84dd808a8 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java
@@ -164,6 +164,7 @@ class ModifierManager {
     }
 
     public Parameter processParameter(Parameter parameter) {
+        rejectAsyncModifier("parameter declarations");
         modifierNodeList.forEach(e -> {
             parameter.setModifiers(parameter.getModifiers() | e.getOpcode());
 
@@ -192,6 +193,7 @@ class ModifierManager {
     }
 
     public VariableExpression processVariableExpression(VariableExpression ve) 
{
+        rejectAsyncModifier("variable declarations");
         modifierNodeList.forEach(e -> {
             ve.setModifiers(ve.getModifiers() | e.getOpcode());
 
@@ -201,6 +203,23 @@ class ModifierManager {
         return ve;
     }
 
+    /**
+     * Rejects the {@code async} modifier when used in an unsupported context
+     * (e.g. parameter or local variable declarations).  The grammar allows
+     * {@code async} as a general modifier, but semantically it is only valid
+     * on method declarations.  This check prevents silent acceptance.
+     *
+     * @param target human-readable description of the disallowed context
+     */
+    private void rejectAsyncModifier(String target) {
+        Optional<ModifierNode> asyncModifier = get(ASYNC);
+        if (asyncModifier.isPresent()) {
+            throw astBuilder.createParsingFailedException(
+                    "modifier `async` is only allowed for method declarations, 
not for " + target,
+                    asyncModifier.get());
+        }
+    }
+
     public <T extends AnnotatedNode> T attachAnnotations(T node) {
         this.getAnnotations().forEach(node::addAnnotation);
 
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 38c66e62a7..59299f9fb3 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -208,10 +208,17 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
      * <p>
      * The consumer thread is registered via {@code consumerThread} during the
      * blocking call so that {@link #close()} can interrupt it if needed.
+     * <p>
+     * <b>Single-consumer invariant</b>: only one thread may call
+     * {@code moveNext()} at a time.  A {@code compareAndSet(null, current)}
+     * guard enforces this — a concurrent second caller receives an
+     * {@link IllegalStateException} immediately instead of silently corrupting
+     * the producer/consumer handshake.
+     * <p>
      * A <em>double-check</em> of the {@link #closed} flag is performed after
      * registration to close the TOCTOU race window: if {@code close()} 
executes
      * between the initial {@code closed.get()} check and the
-     * {@code consumerThread.set()} call, the consumer reference would not yet 
be
+     * {@code consumerThread} CAS, the consumer reference would not yet be
      * visible to {@code close()}, so no interrupt would be delivered, and
      * {@code queue.take()} would block indefinitely.  The re-check after
      * registration detects this case and returns immediately.
@@ -226,10 +233,21 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         if (closed.get()) {
             return Awaitable.of(false);
         }
+        // Enforce single-consumer semantics: only one thread may call 
moveNext()
+        // at a time.  A concurrent second caller would overwrite 
consumerThread,
+        // breaking close()'s interrupt targeting and causing data races on
+        // queue.take().  CAS guards this invariant at the cost of one atomic 
op.
         Thread currentThread = Thread.currentThread();
-        consumerThread.set(currentThread);
+        if (!consumerThread.compareAndSet(null, currentThread)) {
+            Thread existing = consumerThread.get();
+            if (existing != currentThread) {
+                throw new IllegalStateException(
+                        "AsyncStream does not support concurrent consumers. "
+                        + "Current consumer: " + existing);
+            }
+        }
         // Double-check after registration: if close() raced between the first
-        // closed check and consumerThread.set(), the consumer reference was 
not
+        // closed check and consumerThread CAS, the consumer reference was not
         // yet visible to close(), so no interrupt was delivered.  Without this
         // re-check the consumer would block in queue.take() indefinitely.
         if (closed.get()) {
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 633e7038d4..5d8f4aaaf4 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -80,7 +80,7 @@ import java.util.concurrent.TimeoutException;
  *       LIFO execution and exception suppression</li>
  *   <li><b>Delay</b> — {@code delay()} provides non-blocking delays using a
  *       shared {@link java.util.concurrent.ScheduledExecutorService}</li>
- *   <li><b>Timeouts</b> — {@code timeout()} and {@code timeoutOr()} apply
+ *   <li><b>Timeouts</b> — {@code orTimeout()} and {@code completeOnTimeout()} 
apply
  *       non-blocking deadlines while preserving the {@link Awaitable} 
abstraction</li>
  * </ul>
  * <p>
@@ -633,64 +633,86 @@ public class AsyncSupport {
     }
 
     /**
-     * Adapts the given source to an {@link Awaitable} and returns a new
-     * awaitable that fails with {@link java.util.concurrent.TimeoutException}
-     * if the source does not complete before the timeout expires.
+     * Applies fail-fast timeout semantics to the given source.
+     * Returns an awaitable that fails with {@link TimeoutException} if the
+     * source does not complete before the timeout elapses.
      * <p>
      * The timeout does <em>not</em> cancel the original source automatically.
-     * This mirrors the value-level timeout composition style used by
-     * JavaScript's race-based patterns and keeps Groovy's semantics explicit.
-     * Callers that require cooperative cancellation can still invoke
-     * {@link Awaitable#cancel()} on the original task explicitly.
+     * This mirrors value-level race composition and keeps cancellation 
explicit.
      *
      * @param source the async source to time out
-     * @param duration the timeout duration
-     * @param unit the time unit
+     * @param timeout the timeout duration
+     * @param unit the timeout unit
      * @param <T> the result type
-     * @return a new awaitable with timeout semantics
-     * @throws IllegalArgumentException if {@code source} is {@code null},
-     *         {@code duration} is negative, or {@code unit} is {@code null}
+     * @return an awaitable that fails on timeout
+     * @throws IllegalArgumentException if any argument is invalid
      * @since 6.0.0
      */
     @SuppressWarnings("unchecked")
-    public static <T> Awaitable<T> timeout(Object source, long duration, 
TimeUnit unit) {
-        validateTimeoutArguments(source, duration, unit, "Awaitable.timeout");
+    public static <T> Awaitable<T> orTimeout(Object source, long timeout, 
TimeUnit unit) {
+        validateTimeoutArguments(source, timeout, unit, "Awaitable.orTimeout");
         CompletableFuture<T> sourceFuture = (CompletableFuture<T>) 
toCompletableFuture(source);
         CompletableFuture<T> result = new CompletableFuture<>();
         TimeoutException te = new TimeoutException(
-                "Timed out after " + duration + " " + 
unit.name().toLowerCase(Locale.ROOT));
-        scheduleTimeoutRace(sourceFuture, result, () -> 
result.completeExceptionally(te), duration, unit);
+                "Timed out after " + timeout + " " + 
unit.name().toLowerCase(Locale.ROOT));
+        scheduleTimeoutRace(sourceFuture, result, () -> 
result.completeExceptionally(te), timeout, unit);
         return GroovyPromise.of(result);
     }
 
     /**
-     * Adapts the given source to an {@link Awaitable} and returns a new
-     * awaitable that yields the supplied fallback value if the timeout expires
-     * first.
+     * Millisecond shortcut for {@link #orTimeout(Object, long, TimeUnit)}.
+     *
+     * @param source the async source to time out
+     * @param timeoutMillis timeout in milliseconds
+     * @param <T> result type
+     * @return an awaitable that fails on timeout
+     * @since 6.0.0
+     */
+    public static <T> Awaitable<T> orTimeoutMillis(Object source, long 
timeoutMillis) {
+        return orTimeout(source, timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Applies fallback-on-timeout semantics to the given source.
+     * Returns an awaitable that completes with the supplied fallback value if
+     * the source has not completed before the timeout elapses.
      *
      * @param source the async source to wait for
-     * @param fallback the value to use when the timeout expires
-     * @param duration the timeout duration
-     * @param unit the time unit
+     * @param fallback fallback value when timeout elapses first
+     * @param timeout the timeout duration
+     * @param unit the timeout unit
      * @param <T> the result type
-     * @return a new awaitable yielding either the source result or the 
fallback
-     * @throws IllegalArgumentException if {@code source} is {@code null},
-     *         {@code duration} is negative, or {@code unit} is {@code null}
+     * @return an awaitable that yields either source result or fallback value
+     * @throws IllegalArgumentException if any argument is invalid
      * @since 6.0.0
      */
     @SuppressWarnings("unchecked")
-    public static <T> Awaitable<T> timeoutOr(Object source, T fallback, long 
duration, TimeUnit unit) {
-        validateTimeoutArguments(source, duration, unit, 
"Awaitable.timeoutOr");
+    public static <T> Awaitable<T> completeOnTimeout(Object source, T 
fallback, long timeout, TimeUnit unit) {
+        validateTimeoutArguments(source, timeout, unit, 
"Awaitable.completeOnTimeout");
         CompletableFuture<T> sourceFuture = (CompletableFuture<T>) 
toCompletableFuture(source);
         CompletableFuture<T> result = new CompletableFuture<>();
-        scheduleTimeoutRace(sourceFuture, result, () -> 
result.complete(fallback), duration, unit);
+        scheduleTimeoutRace(sourceFuture, result, () -> 
result.complete(fallback), timeout, unit);
         return GroovyPromise.of(result);
     }
 
     /**
-     * Shared logic for {@link #timeout} and {@link #timeoutOr}: schedules the
-     * timeout action and wires up source completion to cancel the timer and
-     * propagate the result or error.
+     * Millisecond shortcut for {@link #completeOnTimeout(Object, Object, 
long, TimeUnit)}.
+     *
+     * @param source the async source to wait for
+     * @param fallback fallback value when timeout elapses first
+     * @param timeoutMillis timeout in milliseconds
+     * @param <T> result type
+     * @return an awaitable that yields either source result or fallback
+     * @since 6.0.0
+     */
+    public static <T> Awaitable<T> completeOnTimeoutMillis(Object source, T 
fallback, long timeoutMillis) {
+        return completeOnTimeout(source, fallback, timeoutMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Shared logic for {@link #orTimeout} and {@link #completeOnTimeout}:
+     * schedules the timeout action and wires up source completion to cancel
+     * the timer and propagate the result or error.
      */
     private static <T> void scheduleTimeoutRace(
             CompletableFuture<T> sourceFuture,
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java 
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
new file mode 100644
index 0000000000..4d182219bc
--- /dev/null
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -0,0 +1,377 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import groovy.concurrent.AsyncStream;
+import groovy.concurrent.Awaitable;
+import groovy.concurrent.AwaitableAdapter;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Bridges JDK {@link Flow.Publisher} instances into Groovy's
+ * {@link Awaitable}/{@link AsyncStream} world.
+ *
+ * <h2>Registration</h2>
+ * This adapter is auto-discovered by the
+ * {@link groovy.concurrent.AwaitableAdapterRegistry} via the
+ * {@code META-INF/services/groovy.concurrent.AwaitableAdapter} file.
+ * It handles any object that implements {@link Flow.Publisher}.
+ *
+ * <h2>Adaptation modes</h2>
+ * <ul>
+ *   <li><b>Single-value</b> ({@code await publisher}):
+ *       subscribes, takes the first {@code onNext} item, cancels the
+ *       upstream subscription, and completes the returned {@link 
Awaitable}.</li>
+ *   <li><b>Multi-value</b> ({@code for await (item in publisher)}):
+ *       wraps the publisher into an {@link AsyncStream} backed by a
+ *       bounded {@link LinkedBlockingQueue}, providing natural
+ *       back-pressure by requesting one item at a time.</li>
+ * </ul>
+ *
+ * <h2>Thread safety</h2>
+ * <p>All subscriber callbacks ({@code onSubscribe}, {@code onNext},
+ * {@code onError}, {@code onComplete}) are safe for invocation from
+ * any thread.  Subscription references use {@link AtomicReference}
+ * for safe publication and race-free cancellation.  The close path
+ * uses a CAS on an {@link AtomicBoolean} to guarantee exactly-once
+ * cleanup semantics.</p>
+ *
+ * <h2>Reactive Streams compliance</h2>
+ * <p>This adapter follows the Reactive Streams specification
+ * (JDK {@link Flow} variant) rules:</p>
+ * <ul>
+ *   <li>§2.5 — duplicate {@code onSubscribe} cancels the second 
subscription</li>
+ *   <li>§2.13 — {@code null} items in {@code onNext} are rejected 
immediately</li>
+ *   <li>Terminal signals ({@code onError}/{@code onComplete}) use
+ *       blocking {@code put()} to guarantee delivery even under queue
+ *       contention</li>
+ * </ul>
+ *
+ * @see groovy.concurrent.AwaitableAdapterRegistry
+ * @see AsyncStream
+ * @since 6.0.0
+ */
+public class FlowPublisherAdapter implements AwaitableAdapter {
+
+    /**
+     * Queue capacity for the push→pull bridge in
+     * {@link #publisherToAsyncStream}.  256 provides a generous buffer
+     * for bursty publishers while bounding memory.
+     */
+    private static final int QUEUE_CAPACITY = 256;
+
+    /**
+     * Returns {@code true} if the given type is assignable to
+     * {@link Flow.Publisher}, enabling single-value {@code await}.
+     *
+     * @param type the source type to check
+     * @return {@code true} if this adapter can handle the type
+     */
+    @Override
+    public boolean supportsAwaitable(Class<?> type) {
+        return Flow.Publisher.class.isAssignableFrom(type);
+    }
+
+    /**
+     * Returns {@code true} if the given type is assignable to
+     * {@link Flow.Publisher}, enabling multi-value {@code for await}.
+     *
+     * @param type the source type to check
+     * @return {@code true} if this adapter can produce an async stream
+     */
+    @Override
+    public boolean supportsAsyncStream(Class<?> type) {
+        return Flow.Publisher.class.isAssignableFrom(type);
+    }
+
+    /**
+     * Converts a {@link Flow.Publisher} to a single-value {@link Awaitable}
+     * by subscribing and taking the first emitted item.
+     *
+     * @param source the publisher instance
+     * @param <T>    the element type
+     * @return an awaitable that resolves to the first emitted value
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> Awaitable<T> toAwaitable(Object source) {
+        return publisherToAwaitable((Flow.Publisher<T>) source);
+    }
+
+    /**
+     * Converts a {@link Flow.Publisher} to a multi-value {@link AsyncStream}
+     * for use with {@code for await} loops.
+     *
+     * @param source the publisher instance
+     * @param <T>    the element type
+     * @return an async stream that yields publisher items
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> AsyncStream<T> toAsyncStream(Object source) {
+        return publisherToAsyncStream((Flow.Publisher<T>) source);
+    }
+
+    // ---- Single-value adaptation (await publisher) ----
+
+    /**
+     * Subscribes to the publisher, takes the <em>first</em> emitted item,
+     * cancels the upstream subscription, and returns a completed
+     * {@link Awaitable}.
+     *
+     * <p>If the publisher completes without emitting any item,
+     * the returned awaitable resolves to {@code null}.</p>
+     *
+     * @param publisher the upstream publisher
+     * @param <T>       the element type
+     * @return an awaitable that completes with the first emitted value
+     */
+    private <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<T> publisher) 
{
+        CompletableFuture<T> cf = new CompletableFuture<>();
+        // AtomicReference ensures safe publication of the subscription
+        // across the onSubscribe thread and callback threads (§1.3).
+        AtomicReference<Flow.Subscription> subRef = new AtomicReference<>();
+        // Guard against non-compliant publishers that send multiple signals
+        AtomicBoolean done = new AtomicBoolean(false);
+
+        publisher.subscribe(new Flow.Subscriber<T>() {
+            @Override
+            public void onSubscribe(Flow.Subscription s) {
+                // §2.5: reject duplicate subscriptions
+                if (!subRef.compareAndSet(null, s)) {
+                    s.cancel();
+                    return;
+                }
+                s.request(1);
+            }
+
+            @Override
+            public void onNext(T item) {
+                // §2.13: null items are spec violations
+                if (item == null) {
+                    onError(new NullPointerException(
+                            "Flow.Publisher onNext received null (Reactive 
Streams §2.13)"));
+                    return;
+                }
+                if (done.compareAndSet(false, true)) {
+                    cf.complete(item);
+                    Flow.Subscription sub = subRef.getAndSet(null);
+                    if (sub != null) sub.cancel();
+                }
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                if (done.compareAndSet(false, true)) {
+                    cf.completeExceptionally(t);
+                    // Cancel subscription to release resources (idempotent 
per §3.7)
+                    Flow.Subscription sub = subRef.getAndSet(null);
+                    if (sub != null) sub.cancel();
+                }
+            }
+
+            @Override
+            public void onComplete() {
+                // Publisher completed before emitting — resolve to null
+                if (done.compareAndSet(false, true)) {
+                    cf.complete(null);
+                }
+            }
+        });
+
+        return new GroovyPromise<>(cf);
+    }
+
+    // ---- Multi-value adaptation (for await publisher) ----
+
+    // Signal wrapper types allow us to distinguish values, errors, and
+    // completion in a single queue without type confusion.
+
+    private static final class ValueSignal<T> {
+        final T value;
+        ValueSignal(T value) { this.value = value; }
+    }
+
+    private static final class ErrorSignal {
+        final Throwable error;
+        ErrorSignal(Throwable error) { this.error = error; }
+    }
+
+    /** Singleton sentinel for stream completion. */
+    private static final Object COMPLETE_SENTINEL = new Object();
+
+    /**
+     * Wraps a {@link Flow.Publisher} into an {@link AsyncStream},
+     * providing a pull-based iteration interface over a push-based source.
+     *
+     * <p>Back-pressure is enforced by requesting exactly one item after
+     * each consumed element.  The internal bounded queue (capacity
+     * {@value QUEUE_CAPACITY}) absorbs minor timing jitter between
+     * producer and consumer.</p>
+     *
+     * <p><b>Resource management:</b> When the consumer calls
+     * {@link AsyncStream#close()} (e.g. via {@code break} in a
+     * {@code for await} loop), the upstream subscription is cancelled
+     * and a completion sentinel is injected to unblock any pending
+     * {@code moveNext()} call.</p>
+     *
+     * @param publisher the upstream publisher
+     * @param <T>       the element type
+     * @return an async stream that yields publisher items
+     */
+    @SuppressWarnings("unchecked")
+    private <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> 
publisher) {
+        LinkedBlockingQueue<Object> queue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+        AtomicReference<Flow.Subscription> subRef = new AtomicReference<>();
+        // Tracks whether the stream has been closed (by consumer or terminal 
signal).
+        // CAS ensures exactly-once semantics for the close/cleanup path.
+        AtomicBoolean closedRef = new AtomicBoolean(false);
+        AtomicBoolean streamClosed = new AtomicBoolean(false);
+
+        publisher.subscribe(new Flow.Subscriber<T>() {
+            @Override
+            public void onSubscribe(Flow.Subscription s) {
+                // §2.5: reject duplicate subscriptions
+                if (!subRef.compareAndSet(null, s)) {
+                    s.cancel();
+                    return;
+                }
+                // Double-check pattern: if close() raced between the CAS and 
this point,
+                // the subscription must be cancelled immediately to avoid a 
dangling stream.
+                if (closedRef.get()) {
+                    Flow.Subscription sub = subRef.getAndSet(null);
+                    if (sub != null) sub.cancel();
+                    return;
+                }
+                s.request(1);
+            }
+
+            @Override
+            public void onNext(T item) {
+                // §2.13: null items are spec violations
+                if (item == null) {
+                    onError(new NullPointerException(
+                            "Flow.Publisher onNext received null (Reactive 
Streams §2.13)"));
+                    return;
+                }
+                if (!closedRef.get()) {
+                    // Use offer() for value signals — if the queue is full 
(publisher
+                    // misbehaving with respect to demand), the item is 
dropped rather
+                    // than blocking the publisher thread indefinitely.
+                    queue.offer(new ValueSignal<>(item));
+                }
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                // Cancel subscription eagerly to release upstream resources
+                Flow.Subscription sub = subRef.getAndSet(null);
+                if (sub != null) sub.cancel();
+                try {
+                    // Terminal signals use blocking put() to guarantee 
delivery —
+                    // the consumer MUST see the error to propagate it 
correctly.
+                    queue.put(new ErrorSignal(t));
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            @Override
+            public void onComplete() {
+                try {
+                    // Blocking put() guarantees the consumer will see the 
sentinel,
+                    // even if the queue was temporarily full from buffered 
values.
+                    queue.put(COMPLETE_SENTINEL);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        return new AsyncStream<T>() {
+            private T current;
+
+            @Override
+            public Awaitable<Boolean> moveNext() {
+                // Fast path: stream already closed — no CF allocation needed
+                if (streamClosed.get()) {
+                    return Awaitable.of(Boolean.FALSE);
+                }
+
+                CompletableFuture<Boolean> cf = new CompletableFuture<>();
+                try {
+                    Object signal = queue.take();
+
+                    if (signal instanceof ValueSignal) {
+                        current = ((ValueSignal<T>) signal).value;
+                        cf.complete(Boolean.TRUE);
+                        // Request the next item — back-pressure: one-at-a-time
+                        Flow.Subscription sub = subRef.get();
+                        if (sub != null) sub.request(1);
+                    } else if (signal instanceof ErrorSignal) {
+                        streamClosed.set(true);
+                        cf.completeExceptionally(((ErrorSignal) signal).error);
+                    } else {
+                        // COMPLETE_SENTINEL or unknown — treat as 
end-of-stream
+                        streamClosed.set(true);
+                        cf.complete(Boolean.FALSE);
+                    }
+                } catch (InterruptedException ie) {
+                    // Consumer thread was interrupted — throw directly as
+                    // CancellationException (matching AsyncStreamGenerator 
behaviour
+                    // and avoiding JDK 23+ CompletableFuture.get() wrapping 
issues)
+                    streamClosed.set(true);
+                    Thread.currentThread().interrupt();
+                    CancellationException ce = new 
CancellationException("Interrupted during moveNext");
+                    ce.initCause(ie);
+                    throw ce;
+                }
+
+                return new GroovyPromise<>(cf);
+            }
+
+            @Override
+            public T getCurrent() {
+                return current;
+            }
+
+            @Override
+            public void close() {
+                if (streamClosed.compareAndSet(false, true)) {
+                    closedRef.set(true);
+                    // Cancel the upstream subscription
+                    Flow.Subscription sub = subRef.getAndSet(null);
+                    if (sub != null) sub.cancel();
+                    // Drain the queue and inject a sentinel to unblock a
+                    // concurrent moveNext() that may be blocked in take().
+                    // Using put() after clear() guarantees delivery because
+                    // the queue was just emptied and has capacity.
+                    queue.clear();
+                    queue.offer(COMPLETE_SENTINEL);
+                }
+            }
+        };
+    }
+}
diff --git 
a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java 
b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
index e39280af9e..5edd5f9e15 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
@@ -103,7 +103,9 @@ public class AsyncASTTransformation extends 
AbstractASTTransformation {
         ClassNode originalReturnType = mNode.getReturnType();
         if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName())
                 || 
ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName())
-                || 
"java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) {
+                || 
"java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())
+                || 
"java.util.concurrent.CompletionStage".equals(originalReturnType.getName())
+                || 
"java.util.concurrent.Future".equals(originalReturnType.getName())) {
             addError(MY_TYPE_NAME + " cannot be applied to a method that 
already returns an async type", mNode);
             return;
         }
diff --git a/src/resources/META-INF/services/groovy.concurrent.AwaitableAdapter 
b/src/resources/META-INF/services/groovy.concurrent.AwaitableAdapter
new file mode 100644
index 0000000000..a591ba6a55
--- /dev/null
+++ b/src/resources/META-INF/services/groovy.concurrent.AwaitableAdapter
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.groovy.runtime.async.FlowPublisherAdapter
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index a491c347bf..cb9c88ee2c 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -39,7 +39,8 @@ Key capabilities include:
 * **`for await`** — iterate over asynchronous data sources
 * **`yield return`** — produce asynchronous streams (generators)
 * **`defer`** — schedule Go-style cleanup actions that run when the method 
completes
-* **Framework integration** — built-in adapters for `CompletableFuture`, 
`Future`, and `Flow.Publisher`;
+* **Framework integration** — built-in adapters for `CompletableFuture` and 
`Future`;
+`Flow.Publisher` support via the auto-discovered `FlowPublisherAdapter` (an 
internal runtime adapter);
 extensible to RxJava, Reactor, and Spring via the adapter registry
 
 On JDK 21+, async methods automatically leverage
@@ -123,7 +124,7 @@ the developer to manage queues, signals, or synchronization.
 
 Groovy does not force developers to abandon their existing async investments.
 The `await` keyword natively understands `CompletableFuture`, 
`CompletionStage`,
-`Future`, and `Flow.Publisher`:
+`Future`, and `Flow.Publisher` (via the auto-discovered `FlowPublisherAdapter` 
runtime adapter):
 
 [source,groovy]
 ----
@@ -574,11 +575,13 @@ Create an awaitable timer (analogous to JavaScript's 
`setTimeout` wrapped in a `
 include::../test/AsyncAwaitSpecTest.groovy[tags=delay_example,indent=0]
 ----
 
-=== `timeout` / `timeoutOr` — Non-Blocking Deadlines
+=== `orTimeout` / `completeOnTimeout` — Non-Blocking Deadlines
 
-Apply a deadline without blocking the calling thread. `Awaitable.timeout(...)`
-fails with `TimeoutException`, while `Awaitable.timeoutOr(...)` completes with 
a
-fallback value. The instance forms `orTimeout(...)` and 
`completeOnTimeout(...)`
+Apply a deadline without blocking the calling thread. 
`Awaitable.orTimeoutMillis(...)`
+(or `Awaitable.orTimeout(source, duration, unit)` for explicit units)
+fails with `TimeoutException`, while `Awaitable.completeOnTimeoutMillis(...)`
+(or `Awaitable.completeOnTimeout(source, fallback, duration, unit)`) completes 
with a
+fallback value. The instance forms `orTimeoutMillis(...)` and 
`completeOnTimeoutMillis(...)`
 provide the same capability on an existing awaitable.
 
 [source,groovy]
@@ -586,10 +589,10 @@ provide the same capability on an existing awaitable.
 include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_combinators,indent=0]
 ----
 
-==== Instance Forms: `orTimeout` and `completeOnTimeout`
+==== Instance Forms: `orTimeoutMillis` and `completeOnTimeoutMillis`
 
-The instance methods `orTimeout(millis)` and `completeOnTimeout(fallback, 
millis)` provide
-a fluent alternative to the static `Awaitable.timeout()` and 
`Awaitable.timeoutOr()` forms:
+The instance methods `orTimeoutMillis(millis)` and 
`completeOnTimeoutMillis(fallback, millis)` provide
+a fluent alternative to the static `Awaitable.orTimeoutMillis()` and 
`Awaitable.completeOnTimeoutMillis()` forms:
 
 [source,groovy]
 ----
@@ -636,12 +639,18 @@ This is conceptually similar to JavaScript async 
iterators' `return()`, C#'s
 == Adapter Registry
 
 The `groovy.concurrent.AwaitableAdapterRegistry` allows extending `await` to 
support additional
-asynchronous types from third-party frameworks. Built-in adapters handle:
+asynchronous types from third-party frameworks. The built-in adapter handles:
 
 * `groovy.concurrent.Awaitable` (native Groovy promise — passthrough)
 * `java.util.concurrent.CompletableFuture` and `CompletionStage`
 * `java.util.concurrent.Future` (blocking, delegated to a configurable 
executor)
-* `java.util.concurrent.Flow.Publisher` (single-value `await` and multi-value 
`for await`)
+
+Additionally, `java.util.concurrent.Flow.Publisher` support (single-value 
`await` and
+multi-value `for await`) is provided by `FlowPublisherAdapter`
+(`org.apache.groovy.runtime.async`), which is auto-discovered via 
`ServiceLoader`.
+This decoupled design allows applications to override the default
+reactive-streams bridge with a framework-specific adapter (e.g., one backed by 
Reactor's
+native scheduler) by registering a higher-priority adapter.
 
 === Registering a Custom Adapter
 
@@ -734,7 +743,7 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=retry_pattern,indent=0]
 
 === Timeout
 
-Use `Awaitable.timeout()` (or the instance form `orTimeout()`) to apply a
+Use `Awaitable.orTimeoutMillis()` (or the instance form `orTimeoutMillis()`) 
to apply a
 deadline without dropping back to manual race logic:
 
 [source,groovy]
@@ -949,7 +958,8 @@ writes (adapter registration) are rare, reads (every 
`await`) are frequent
 * `AsyncStreamGenerator.current` — `volatile` field for cross-thread 
producer/consumer visibility
 * `AsyncStreamGenerator` close state — `AtomicBoolean` + 
`AtomicReference<Thread>` for prompt,
 idempotent close/cancellation signalling
-* `Flow.Publisher` adaptation — `AtomicReference<Subscription>` plus 
close-aware queue signalling
+* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — 
`AtomicReference<Subscription>` with CAS-guarded
+  onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value 
path, and close-aware queue signalling
 * Defer scopes — per-task `ArrayDeque`, confined to a single thread (no 
sharing)
 * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
 
@@ -1100,7 +1110,7 @@ JavaScript, C#, Kotlin, and Swift, for developers 
familiar with those languages.
 | `Task.sleep(for:)`
 
 | **Timeout**
-| `Awaitable.timeout(task, ms)` / `task.orTimeout(ms)`
+| `Awaitable.orTimeoutMillis(task, ms)` / `task.orTimeoutMillis(ms)`
 | `Promise.race([...])` / `AbortSignal.timeout()`
 | `task.WaitAsync(timeout)` / `CancelAfter`
 | `withTimeout(ms) { ... }`
@@ -1192,7 +1202,7 @@ JavaScript, C#, Kotlin, and Swift, for developers 
familiar with those languages.
 | `awaitable.isDone()` / `isCancelled()` / `isCompletedExceptionally()`
 
 | Timeout (instance)
-| `awaitable.orTimeout(ms)` / `awaitable.completeOnTimeout(fallback, ms)`
+| `awaitable.orTimeoutMillis(ms)` / 
`awaitable.completeOnTimeoutMillis(fallback, ms)`
 
 | AwaitResult
 | `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }`
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy 
b/src/spec/test/AsyncAwaitSpecTest.groovy
index eb5ac71e13..fd3636ec80 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -677,13 +677,13 @@ async slowCall() {
 }
 
 try {
-    await(Awaitable.timeout(slowCall(), 50))
+    await(Awaitable.orTimeoutMillis(slowCall(), 50))
     assert false : "should have timed out"
 } catch (TimeoutException e) {
     assert e.message.contains("Timed out after 50")
 }
 
-assert await(Awaitable.timeoutOr(slowCall(), "cached", 50)) == "cached"
+assert await(Awaitable.completeOnTimeoutMillis(slowCall(), "cached", 50)) == 
"cached"
 // end::timeout_combinators[]
         '''
     }
@@ -874,7 +874,7 @@ async longRunningTask() {
 }
 
 try {
-    await Awaitable.timeout(longRunningTask(), 50)
+    await Awaitable.orTimeoutMillis(longRunningTask(), 50)
     assert false : "should have timed out"
 } catch (TimeoutException e) {
     assert e.message.contains("Timed out after 50")
@@ -1164,14 +1164,14 @@ async slowTask() {
 
 // orTimeout: fails with TimeoutException if not completed in time
 try {
-    await(slowTask().orTimeout(50))
+    await(slowTask().orTimeoutMillis(50))
     assert false : "should have timed out"
 } catch (TimeoutException e) {
     assert e.message.contains("50")
 }
 
 // completeOnTimeout: completes with a fallback value instead of failing
-assert await(slowTask().completeOnTimeout("fallback", 50)) == "fallback"
+assert await(slowTask().completeOnTimeoutMillis("fallback", 50)) == "fallback"
 // end::instance_timeout_methods[]
         '''
     }
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 7df7d1a152..34886a564d 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1818,7 +1818,7 @@ class AsyncApiTest {
     void testAwaitableOrTimeoutInstanceMethod() {
         def cf = new CompletableFuture<>()
         def awaitable = GroovyPromise.of(cf)
-        def withTimeout = awaitable.orTimeout(50)
+        def withTimeout = awaitable.orTimeoutMillis(50)
         def ex = shouldFail(ExecutionException) {
             withTimeout.get()
         }
@@ -1840,7 +1840,7 @@ class AsyncApiTest {
     void testAwaitableCompleteOnTimeoutInstanceMethod() {
         def cf = new CompletableFuture<>()
         def awaitable = GroovyPromise.of(cf)
-        def withFallback = awaitable.completeOnTimeout('default', 50)
+        def withFallback = awaitable.completeOnTimeoutMillis('default', 50)
         assert withFallback.get() == 'default'
     }
 
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
index 1206ae405c..58be5d29c6 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
@@ -311,4 +311,230 @@ class AsyncConcurrencyTest {
             assert !consumer.isAlive() : "Iteration $iteration: Consumer 
deadlocked"
         }
     }
+
+    // =================================================================
+    // moveNext() branch coverage tests
+    // =================================================================
+
+    /**
+     * Branch: same thread re-entry.
+     *
+     * The CAS guard in moveNext() allows the same thread to call moveNext()
+     * multiple times sequentially (the expected usage pattern).  This test
+     * explicitly verifies that no ISE is thrown when the same thread drives
+     * multiple iterations.
+     */
+    @Test
+    @Timeout(5)
+    @DisplayName("Same thread can call moveNext() sequentially without ISE")
+    void testSameThreadSequentialMoveNext() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        Thread.start {
+            gen.yield(1)
+            gen.yield(2)
+            gen.yield(3)
+            gen.complete()
+        }
+
+        def items = []
+        // All three moveNext() + getCurrent() calls happen on the SAME thread.
+        while (gen.moveNext().get()) {
+            items << gen.getCurrent()
+        }
+        assert items == [1, 2, 3]
+    }
+
+    /**
+     * Branch: InterruptedException caught while closed flag is true.
+     *
+     * When moveNext() is blocked in queue.take() and close() fires (setting
+     * closed=true and interrupting the consumer), the InterruptedException
+     * handler checks closed.get().  If true, it returns Awaitable.of(false)
+     * instead of throwing CancellationException.
+     *
+     * This test deterministically triggers this branch by:
+     * 1. Starting moveNext() which blocks on the empty queue
+     * 2. Calling close() from another thread which sets closed=true and
+     *    interrupts the consumer
+     * 3. Verifying that moveNext() returns false (not an exception)
+     */
+    @Test
+    @Timeout(5)
+    @DisplayName("moveNext() returns false when interrupted after close()")
+    void testMoveNextInterruptedWhileClosed() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        def moveNextResult = new AtomicReference<Boolean>()
+        def moveNextError = new AtomicReference<Throwable>()
+        def consumerStarted = new CountDownLatch(1)
+
+        def consumer = Thread.start {
+            try {
+                consumerStarted.countDown()
+                // This will block because no producer is yielding anything
+                def result = gen.moveNext().get()
+                moveNextResult.set(result)
+            } catch (Throwable t) {
+                moveNextError.set(t)
+            }
+        }
+
+        // Wait for consumer to start blocking in moveNext()
+        consumerStarted.await(2, TimeUnit.SECONDS)
+        Thread.sleep(50)  // Brief pause to ensure queue.take() is entered
+
+        // close() sets closed=true and interrupts the consumer thread
+        gen.close()
+
+        consumer.join(3000)
+        assert !consumer.isAlive() : "Consumer should have exited"
+
+        // The IE handler should detect closed=true and return false
+        assert moveNextResult.get() == false : "Expected false from moveNext() 
after close()"
+        assert moveNextError.get() == null : "Expected no exception, got: 
${moveNextError.get()}"
+    }
+
+    /**
+     * Branch: double-check after CAS detects close() race.
+     *
+     * If close() fires between the initial closed.get() check and the
+     * consumerThread CAS, the consumer reference is not yet visible to 
close().
+     * The double-check after CAS detects this and returns false immediately
+     * without entering queue.take().
+     *
+     * This test uses a CyclicBarrier to synchronize close() and moveNext()
+     * on the same timing boundary.  We run multiple iterations because the
+     * exact race window is non-deterministic.
+     */
+    @Test
+    @Timeout(10)
+    @DisplayName("Double-check after CAS detects close() race")
+    void testDoubleCheckAfterCasDetectsCloseRace() {
+        int detected = 0
+        for (int i = 0; i < 200; i++) {
+            def gen = new AsyncStreamGenerator<Integer>()
+            def barrier = new CyclicBarrier(2)
+            def result = new AtomicReference<Object>()
+
+            def consumer = Thread.start {
+                try {
+                    barrier.await(2, TimeUnit.SECONDS)
+                    def r = gen.moveNext().get()
+                    result.set(r)
+                } catch (Throwable t) {
+                    result.set(t)
+                }
+            }
+
+            def closer = Thread.start {
+                try {
+                    barrier.await(2, TimeUnit.SECONDS)
+                    gen.close()
+                } catch (Throwable t) {
+                    // ignore
+                }
+            }
+
+            consumer.join(3000)
+            closer.join(3000)
+
+            def r = result.get()
+            // Either:
+            // 1. moveNext() returns false (caught by initial check OR 
double-check)
+            // 2. moveNext() throws CancellationException (interrupted in 
take())
+            // Both are valid outcomes of the race — but it must NEVER hang.
+            if (r == false || r == Boolean.FALSE) {
+                detected++
+            }
+        }
+        // At least some iterations should hit the false-return path
+        assert detected > 0 : "Expected at least one iteration where 
moveNext() returned false"
+    }
+
+    /**
+     * Branch: moveNext() on already-closed generator returns false 
immediately.
+     *
+     * After close() is called, all subsequent moveNext() calls should return
+     * Awaitable.of(false) without blocking.
+     */
+    @Test
+    @Timeout(5)
+    @DisplayName("moveNext() after close returns false without blocking")
+    void testMoveNextAfterCloseReturnsFalse() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        gen.close()
+
+        // Should return false immediately (no producer needed)
+        assert gen.moveNext().get() == false
+        assert gen.moveNext().get() == false
+        assert gen.moveNext().get() == false
+    }
+
+    /**
+     * Branch: ErrorItem with Error subclass (thrown directly, not via 
sneakyThrow).
+     *
+     * When the producer yields an Error (not Exception), moveNext() should
+     * throw it directly as an Error, not wrap it.
+     */
+    @Test
+    @Timeout(5)
+    @DisplayName("moveNext() propagates Error subclass directly")
+    void testMoveNextPropagatesErrorDirectly() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        Thread.start {
+            gen.error(new StackOverflowError("test error"))
+        }
+
+        try {
+            gen.moveNext().get()
+            assert false : "Expected StackOverflowError"
+        } catch (StackOverflowError e) {
+            assert e.message == "test error"
+        }
+    }
+
+    /**
+     * Branch: ErrorItem with Exception (thrown via sneakyThrow).
+     *
+     * When the producer yields a checked Exception, moveNext() should throw
+     * it via sneakyThrow (bypassing checked exception compiler enforcement).
+     */
+    @Test
+    @Timeout(5)
+    @DisplayName("moveNext() propagates Exception via sneakyThrow")
+    void testMoveNextPropagatesExceptionViaSneakyThrow() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        Thread.start {
+            gen.error(new java.io.IOException("disk fail"))
+        }
+
+        try {
+            gen.moveNext().get()
+            assert false : "Expected IOException"
+        } catch (java.io.IOException e) {
+            assert e.message == "disk fail"
+        }
+    }
+
+    /**
+     * Branch: DONE sentinel marks stream exhausted.
+     *
+     * After the producer calls complete(), the next moveNext() should
+     * receive the DONE sentinel and return false.
+     */
+    @Test
+    @Timeout(5)
+    @DisplayName("moveNext() returns false on DONE sentinel after complete()")
+    void testMoveNextReturnsFalseOnDoneSentinel() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        Thread.start {
+            gen.yield(42)
+            gen.complete()
+        }
+
+        assert gen.moveNext().get() == true
+        assert gen.getCurrent() == 42
+        assert gen.moveNext().get() == false
+        // Subsequent calls should also return false (closed state)
+        assert gen.moveNext().get() == false
+    }
 }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
index 5a0737f9cc..1196af9510 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
@@ -1063,13 +1063,13 @@ class AsyncPatternsTest {
             }
 
             try {
-                await(Awaitable.timeout(new CompletableFuture<String>(), 50, 
TimeUnit.MILLISECONDS))
+                await(Awaitable.orTimeout(new CompletableFuture<String>(), 50, 
TimeUnit.MILLISECONDS))
                 assert false : "Should have timed out"
             } catch (TimeoutException e) {
                 assert e.message.contains("Timed out after 50")
             }
 
-            def fallback = await(Awaitable.timeoutOr(
+            def fallback = await(Awaitable.completeOnTimeout(
                 Awaitable.delay(10_000).then { "late" },
                 "cached",
                 50,

Reply via email to