This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git

commit e38ef50316dddbf801035634185a5ee3a573c4ec
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 11:45:44 2026 +0900

    Minor tweaks
---
 src/main/java/groovy/concurrent/AsyncStream.java   |  30 ++
 src/main/java/groovy/concurrent/Awaitable.java     |  39 +-
 .../concurrent/AwaitableAdapterRegistry.java       |  14 +-
 src/main/java/groovy/transform/Async.java          |   4 +-
 .../apache/groovy/runtime/async/AsyncSupport.java  |  17 +-
 .../groovy/runtime/async/FlowPublisherAdapter.java |  53 ++-
 src/spec/doc/core-async-await.adoc                 |  33 +-
 src/spec/test/AsyncAwaitSpecTest.groovy            |  38 +-
 .../codehaus/groovy/transform/AsyncApiTest.groovy  | 438 +++++++++++++++++++--
 .../groovy/transform/AsyncDeferFlowTest.groovy     |  76 ++--
 .../transform/AsyncFrameworkIntegrationTest.groovy |  20 +-
 .../groovy/transform/AsyncTransformTest.groovy     |  18 +-
 12 files changed, 674 insertions(+), 106 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AsyncStream.java 
b/src/main/java/groovy/concurrent/AsyncStream.java
index 26c7abe316..4baa81115a 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -75,6 +75,36 @@ public interface AsyncStream<T> extends AutoCloseable {
     default void close() {
     }
 
+    /**
+     * Converts the given source to an {@code AsyncStream}.
+     * <p>
+     * If the source is already an {@code AsyncStream}, it is returned as-is.
+     * Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a
+     * suitable adapter. Built-in adapters handle {@link Iterable} and
+     * {@link java.util.Iterator}; the auto-discovered {@code 
FlowPublisherAdapter}
+     * handles {@link java.util.concurrent.Flow.Publisher}; third-party 
frameworks
+     * can register additional adapters via the registry.
+     * <p>
+     * This is the recommended entry point for converting external collection 
or
+     * reactive types to {@code AsyncStream}:
+     * <pre>
+     * AsyncStream&lt;String&gt; stream = AsyncStream.from(myList)
+     * AsyncStream&lt;Integer&gt; stream2 = AsyncStream.from(myFlowPublisher)
+     * </pre>
+     *
+     * @param source the source object; must not be {@code null}
+     * @param <T>    the element type
+     * @return an async stream backed by the source
+     * @throws IllegalArgumentException if {@code source} is {@code null}
+     *         or no adapter supports the source type
+     * @see AwaitableAdapterRegistry#toAsyncStream(Object)
+     * @since 6.0.0
+     */
+    @SuppressWarnings("unchecked")
+    static <T> AsyncStream<T> from(Object source) {
+        return AwaitableAdapterRegistry.toAsyncStream(source);
+    }
+
     /**
      * Returns an empty {@code AsyncStream} that completes immediately.
      */
diff --git a/src/main/java/groovy/concurrent/Awaitable.java 
b/src/main/java/groovy/concurrent/Awaitable.java
index ce41b7f947..d7bf8fb689 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -54,14 +54,18 @@ import java.util.function.Function;
  *       {@code Promise.allSettled()}</li>
  *   <li>{@link #delay(long) Awaitable.delay(ms)} — like
  *       {@code Task.Delay()} / {@code setTimeout}</li>
- *   <li>{@link #timeout(Object, long) Awaitable.timeout(task, ms)} — like
+ *   <li>{@link #orTimeoutMillis(Object, long) Awaitable.orTimeoutMillis(task, 
ms)} — like
  *       Kotlin's {@code withTimeout} or a JavaScript promise raced against a 
timer</li>
- *   <li>{@link #timeoutOr(Object, Object, long) Awaitable.timeoutOr(task, 
fallback, ms)} —
+ *   <li>{@link #completeOnTimeoutMillis(Object, Object, long)
+ *       Awaitable.completeOnTimeoutMillis(task, fallback, ms)} —
  *       like a timeout with fallback/default value</li>
  * </ul>
  * <p>
- * <b>Static factories:</b>
+ * <b>Static factories and conversion:</b>
  * <ul>
+ *   <li>{@link #from(Object) Awaitable.from(source)} — converts any supported
+ *       async type (CompletableFuture, CompletionStage, Future, 
Flow.Publisher, etc.)
+ *       to an {@code Awaitable}</li>
  *   <li>{@link #of(Object) Awaitable.of(value)} — like
  *       {@code Task.FromResult()} / {@code Promise.resolve()}</li>
  *   <li>{@link #failed(Throwable) Awaitable.failed(error)} — like
@@ -297,6 +301,35 @@ public interface Awaitable<T> {
 
     // ---- Static factories ----
 
+    /**
+     * Converts the given source to an {@code Awaitable}.
+     * <p>
+     * If the source is already an {@code Awaitable}, it is returned as-is.
+     * Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a
+     * suitable adapter. Built-in adapters handle {@link CompletableFuture},
+     * {@link java.util.concurrent.CompletionStage}, {@link 
java.util.concurrent.Future},
+     * and {@link java.util.concurrent.Flow.Publisher}; third-party frameworks
+     * can register additional adapters via the registry.
+     * <p>
+     * This is the recommended entry point for converting external async types
+     * to {@code Awaitable}:
+     * <pre>
+     * Awaitable&lt;String&gt; aw = Awaitable.from(someCompletableFuture)
+     * Awaitable&lt;Integer&gt; aw2 = Awaitable.from(someReactorMono)
+     * </pre>
+     *
+     * @param source the source object; must not be {@code null}
+     * @param <T>    the result type
+     * @return an awaitable backed by the source
+     * @throws IllegalArgumentException if {@code source} is {@code null}
+     *         or no adapter supports the source type
+     * @see AwaitableAdapterRegistry#toAwaitable(Object)
+     * @since 6.0.0
+     */
+    static <T> Awaitable<T> from(Object source) {
+        return AwaitableAdapterRegistry.toAwaitable(source);
+    }
+
     /**
      * Returns an already-completed {@code Awaitable} with the given value.
      * Analogous to C#'s {@code Task.FromResult()} or JavaScript's
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java 
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index a3500a13d3..61398d154d 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -111,13 +111,18 @@ public class AwaitableAdapterRegistry {
     /**
      * Converts the given source to an {@link Awaitable}.
      * If the source is already an {@code Awaitable}, it is returned as-is.
+     * <p>
+     * <b>Tip:</b> user code should generally prefer {@link 
Awaitable#from(Object)},
+     * which delegates to this method but is more discoverable from the
+     * {@code Awaitable} type itself.
      *
      * @param source the source object; must not be {@code null}
      * @throws IllegalArgumentException if {@code source} is {@code null}
      *         or no adapter supports the source type
+     * @see Awaitable#from(Object)
      */
     @SuppressWarnings("unchecked")
-    public static <T> Awaitable<T> toAwaitable(Object source) {
+    static <T> Awaitable<T> toAwaitable(Object source) {
         if (source == null) {
             throw new IllegalArgumentException("Cannot convert null to 
Awaitable");
         }
@@ -136,13 +141,18 @@ public class AwaitableAdapterRegistry {
     /**
      * Converts the given source to an {@link AsyncStream}.
      * If the source is already an {@code AsyncStream}, it is returned as-is.
+     * <p>
+     * <b>Tip:</b> user code should generally prefer {@link 
AsyncStream#from(Object)},
+     * which delegates to this method but is more discoverable from the
+     * {@code AsyncStream} type itself.
      *
      * @param source the source object; must not be {@code null}
      * @throws IllegalArgumentException if {@code source} is {@code null}
      *         or no adapter supports the source type
+     * @see AsyncStream#from(Object)
      */
     @SuppressWarnings("unchecked")
-    public static <T> AsyncStream<T> toAsyncStream(Object source) {
+    static <T> AsyncStream<T> toAsyncStream(Object source) {
         if (source == null) {
             throw new IllegalArgumentException("Cannot convert null to 
AsyncStream");
         }
diff --git a/src/main/java/groovy/transform/Async.java 
b/src/main/java/groovy/transform/Async.java
index 90e9cdca4b..2e09ba2a46 100644
--- a/src/main/java/groovy/transform/Async.java
+++ b/src/main/java/groovy/transform/Async.java
@@ -107,7 +107,9 @@ import java.lang.annotation.Target;
  * <ul>
  *   <li>Cannot be applied to abstract methods</li>
  *   <li>Cannot be applied to constructors</li>
- *   <li>Cannot be applied to methods that already return {@code 
Awaitable}</li>
+ *   <li>Cannot be applied to methods that already return an async type
+ *       ({@code Awaitable}, {@code AsyncStream}, {@code CompletableFuture},
+ *       {@code CompletionStage}, or {@code Future})</li>
  * </ul>
  *
  * @see groovy.concurrent.Awaitable
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 5d8f4aaaf4..96658f81e8 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -21,7 +21,6 @@ package org.apache.groovy.runtime.async;
 import groovy.concurrent.AsyncStream;
 import groovy.concurrent.AwaitResult;
 import groovy.concurrent.Awaitable;
-import groovy.concurrent.AwaitableAdapterRegistry;
 import groovy.lang.Closure;
 
 import java.lang.invoke.MethodHandle;
@@ -64,8 +63,9 @@ import java.util.concurrent.TimeoutException;
  * <ul>
  *   <li><b>Async execution</b> — {@code executeAsync()}, {@code 
executeAsyncVoid()},
  *       and {@code wrapAsync()} run closures on the configured executor</li>
- *   <li><b>Await</b> — all {@code await()} overloads go through the
- *       {@link AwaitableAdapterRegistry} so that third-party async types
+ *   <li><b>Await</b> — all {@code await()} overloads use
+ *       {@link groovy.concurrent.Awaitable#from(Object) Awaitable.from()} so
+ *       that third-party async types
  *       (RxJava {@code Single}, Reactor {@code Mono}, etc.) are supported
  *       transparently once an adapter is registered</li>
  *   <li><b>Async generators</b> — {@code generateAsyncStream()} manages the
@@ -112,7 +112,6 @@ import java.util.concurrent.TimeoutException;
  *
  * @see groovy.concurrent.Awaitable
  * @see groovy.transform.Async
- * @see Awaitable
  * @since 6.0.0
  */
 public class AsyncSupport {
@@ -267,8 +266,8 @@ public class AsyncSupport {
     }
 
     /**
-     * Awaits an arbitrary object by adapting it to {@link Awaitable} via the
-     * {@link AwaitableAdapterRegistry}.  This is the fallback overload called
+     * Awaits an arbitrary object by adapting it to {@link Awaitable} via
+     * {@link Awaitable#from(Object)}.  This is the fallback overload called
      * by the {@code await} expression when the compile-time type is not one
      * of the other supported types.  Returns {@code null} for a {@code null}
      * argument.
@@ -285,7 +284,7 @@ public class AsyncSupport {
         if (source instanceof CompletionStage) return 
await((CompletionStage<T>) source);
         if (source instanceof Future) return await((Future<T>) source);
         if (source instanceof Closure) return awaitClosure((Closure<?>) 
source);
-        return await(AwaitableAdapterRegistry.<T>toAwaitable(source));
+        return await(Awaitable.<T>from(source));
     }
 
     /**
@@ -543,7 +542,7 @@ public class AsyncSupport {
         if (source instanceof CompletableFuture<?> cf) return cf;
         if (source instanceof Awaitable<?> a) return a.toCompletableFuture();
         if (source instanceof CompletionStage<?> cs) return 
cs.toCompletableFuture();
-        return 
AwaitableAdapterRegistry.toAwaitable(source).toCompletableFuture();
+        return Awaitable.from(source).toCompletableFuture();
     }
 
     // ---- non-blocking combinators (return Awaitable) --------------------
@@ -813,7 +812,7 @@ public class AsyncSupport {
     public static <T> AsyncStream<T> toAsyncStream(Object source) {
         if (source == null) return AsyncStream.empty();
         if (source instanceof AsyncStream) return (AsyncStream<T>) source;
-        return AwaitableAdapterRegistry.toAsyncStream(source);
+        return AsyncStream.from(source);
     }
 
     /**
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java 
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index 16f8ddc557..f3d5a7847e 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -64,9 +64,13 @@ import java.util.concurrent.atomic.AtomicReference;
  * <ul>
  *   <li>§2.5 — duplicate {@code onSubscribe} cancels the second 
subscription</li>
  *   <li>§2.13 — {@code null} items in {@code onNext} are rejected 
immediately</li>
- *   <li>Terminal signals ({@code onError}/{@code onComplete}) use
+ *   <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use
  *       blocking {@code put()} to guarantee delivery even under queue
  *       contention</li>
+ *   <li>Back-pressure is enforced by requesting exactly one item after
+ *       each consumed element; demand is signalled <em>before</em> the
+ *       consumer's {@code moveNext()} awaitable completes, preventing
+ *       livelock when producer and consumer share the same thread pool</li>
  * </ul>
  *
  * @see groovy.concurrent.AwaitableAdapterRegistry
@@ -227,9 +231,16 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
      * providing a pull-based iteration interface over a push-based source.
      *
      * <p>Back-pressure is enforced by requesting exactly one item after
-     * each consumed element.  The internal bounded queue (capacity
-     * {@value QUEUE_CAPACITY}) absorbs minor timing jitter between
-     * producer and consumer.</p>
+     * each consumed element.  Demand is signalled <em>before</em> the
+     * consumer's {@code moveNext()} awaitable completes, so the publisher
+     * can begin producing the next value while the consumer processes the
+     * current one — this prevents livelock when producer and consumer
+     * share the same thread pool.</p>
+     *
+     * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
+     * absorbs minor timing jitter between producer and consumer.  All
+     * signals use blocking {@code put()}, ensuring no items or terminal
+     * events are silently dropped.</p>
      *
      * <p><b>Resource management:</b> When the consumer calls
      * {@link AsyncStream#close()} (e.g. via {@code break} in a
@@ -277,10 +288,16 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                     return;
                 }
                 if (!closedRef.get()) {
-                    // Use offer() for value signals — if the queue is full 
(publisher
-                    // misbehaving with respect to demand), the item is 
dropped rather
-                    // than blocking the publisher thread indefinitely.
-                    queue.offer(new ValueSignal<>(item));
+                    try {
+                        // Blocking put() guarantees the item reaches the 
consumer.
+                        // Since demand is capped at 1 (one request(1) per 
moveNext),
+                        // a well-behaved publisher will never overflow the 
queue; put()
+                        // still protects against misbehaving publishers by 
blocking
+                        // rather than silently dropping the value.
+                        queue.put(new ValueSignal<>(item));
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
 
@@ -326,10 +343,16 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
 
                     if (signal instanceof ValueSignal) {
                         current = ((ValueSignal<T>) signal).value;
-                        cf.complete(Boolean.TRUE);
-                        // Request the next item — back-pressure: one-at-a-time
+                        // Signal demand for the next item BEFORE completing
+                        // the awaitable, so the publisher can begin producing
+                        // the next value while the consumer processes this 
one.
+                        // Ordering here is critical: if request(1) were called
+                        // after cf.complete(), the consumer could re-enter
+                        // moveNext() and block in take() before demand was
+                        // signalled, creating a livelock.
                         Flow.Subscription sub = subRef.get();
                         if (sub != null) sub.request(1);
+                        cf.complete(Boolean.TRUE);
                     } else if (signal instanceof ErrorSignal) {
                         streamClosed.set(true);
                         cf.completeExceptionally(((ErrorSignal) signal).error);
@@ -366,10 +389,14 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                     if (sub != null) sub.cancel();
                     // Drain the queue and inject a sentinel to unblock a
                     // concurrent moveNext() that may be blocked in take().
-                    // Using offer() after clear() performs a non-blocking,
-                    // best-effort delivery of the sentinel to any waiter.
+                    // Using blocking put() after clear() guarantees delivery;
+                    // since the queue is freshly cleared, put() will not 
block.
                     queue.clear();
-                    queue.offer(COMPLETE_SENTINEL);
+                    try {
+                        queue.put(COMPLETE_SENTINEL);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
         };
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index cb9c88ee2c..fd4c05cdbb 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -610,7 +610,10 @@ RxJava 3 via adapters, Vert.x, etc.).
 
 The internal adapter uses a bounded buffer (capacity 256) to bridge the 
push-based `Publisher`
 model to Groovy's pull-based `AsyncStream` model, preventing unbounded memory 
growth with
-fast publishers while maintaining throughput.
+fast publishers while maintaining throughput.  Back-pressure is enforced by 
requesting exactly
+one item at a time; demand for the next element is signalled _before_ the 
consumer's
+`moveNext()` awaitable completes, so the publisher can begin producing the 
next value while
+the consumer processes the current one.
 
 === Awaiting a Single Value
 
@@ -638,6 +641,24 @@ This is conceptually similar to JavaScript async 
iterators' `return()`, C#'s
 [[adapter-registry]]
 == Adapter Registry
 
+=== Converting External Types with `from()`
+
+The `Awaitable.from(source)` and `AsyncStream.from(source)` static methods are 
the recommended
+entry points for converting external async types to Groovy's native 
abstractions. They delegate
+to the `AwaitableAdapterRegistry` internally but provide a more discoverable, 
user-friendly API:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=from_conversion,indent=0]
+----
+
+`Awaitable.from()` accepts any type supported by a registered adapter: 
`CompletableFuture`,
+`CompletionStage`, `Future`, `Flow.Publisher`, or any third-party type with a 
custom adapter.
+If the source is already an `Awaitable`, it is returned as-is. 
`AsyncStream.from()` works
+similarly for multi-value sources such as `Iterable`, `Iterator`, or 
`Flow.Publisher`.
+
+=== Built-in Adapters
+
 The `groovy.concurrent.AwaitableAdapterRegistry` allows extending `await` to 
support additional
 asynchronous types from third-party frameworks. The built-in adapter handles:
 
@@ -959,7 +980,9 @@ writes (adapter registration) are rare, reads (every 
`await`) are frequent
 * `AsyncStreamGenerator` close state — `AtomicBoolean` + 
`AtomicReference<Thread>` for prompt,
 idempotent close/cancellation signalling
 * `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — 
`AtomicReference<Subscription>` with CAS-guarded
-  onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value 
path, and close-aware queue signalling
+  onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value 
path, and close-aware
+  queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking 
`put()` for
+  guaranteed delivery; demand is signalled before the consumer's awaitable 
completes to prevent livelock
 * Defer scopes — per-task `ArrayDeque`, confined to a single thread (no 
sharing)
 * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
 
@@ -1131,7 +1154,8 @@ JavaScript, C#, Kotlin, and Swift, for developers 
familiar with those languages.
 | task priority / executor chosen by runtime
 
 | **Third-party support**
-| `AwaitableAdapterRegistry` SPI
+| `AwaitableAdapterRegistry` SPI +
+`Awaitable.from()` / `AsyncStream.from()`
 | _(native `thenable` protocol)_
 | Custom awaiters via `GetAwaiter()`
 | coroutine adapters / bridges
@@ -1195,6 +1219,9 @@ JavaScript, C#, Kotlin, and Swift, for developers 
familiar with those languages.
 | Pre-computed result
 | `Awaitable.of(value)` / `Awaitable.failed(error)`
 
+| Type conversion
+| `Awaitable.from(source)` / `AsyncStream.from(source)`
+
 | Annotation form
 | `@Async def methodName() { ... }`
 
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy 
b/src/spec/test/AsyncAwaitSpecTest.groovy
index fd3636ec80..5040424dae 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -1205,7 +1205,43 @@ assert task.isDone()
     }
 
     // 
=========================================================================
-    // 19. Custom adapter registration
+    // 19. Type conversion with from()
+    // 
=========================================================================
+
+    @Test
+    void testFromConversion() {
+        assertScript '''
+// tag::from_conversion[]
+import groovy.concurrent.Awaitable
+import groovy.concurrent.AsyncStream
+import java.util.concurrent.CompletableFuture
+
+// Convert a CompletableFuture to Awaitable
+def cf = CompletableFuture.completedFuture("hello")
+Awaitable<String> awaitable = Awaitable.from(cf)
+assert awaitable.get() == "hello"
+
+// If the source is already an Awaitable, it is returned as-is
+def original = Awaitable.of(42)
+assert Awaitable.from(original).is(original)
+
+// Convert an Iterable to AsyncStream
+AsyncStream<String> stream = AsyncStream.from(["a", "b", "c"])
+def items = []
+assert stream.moveNext().get() == true
+items << stream.current
+assert stream.moveNext().get() == true
+items << stream.current
+assert stream.moveNext().get() == true
+items << stream.current
+assert stream.moveNext().get() == false
+assert items == ["a", "b", "c"]
+// end::from_conversion[]
+        '''
+    }
+
+    // 
=========================================================================
+    // 20. Custom adapter registration
     // 
=========================================================================
 
     @Test
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 34886a564d..7dabc83a79 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1154,13 +1154,13 @@ class AsyncApiTest {
     }
 
     // ================================================================
-    // AwaitableAdapterRegistry
+    // Awaitable.from / AsyncStream.from / AwaitableAdapterRegistry
     // ================================================================
 
     @Test
     void testToAwaitableNull() {
         try {
-            AwaitableAdapterRegistry.toAwaitable(null)
+            Awaitable.from(null)
             assert false
         } catch (IllegalArgumentException e) {
             assert e.message.contains('null')
@@ -1170,7 +1170,7 @@ class AsyncApiTest {
     @Test
     void testToAsyncStreamNull() {
         try {
-            AwaitableAdapterRegistry.toAsyncStream(null)
+            AsyncStream.from(null)
             assert false
         } catch (IllegalArgumentException e) {
             assert e.message.contains('null')
@@ -1180,13 +1180,13 @@ class AsyncApiTest {
     @Test
     void testToAsyncStreamPassthrough() {
         AsyncStream<String> stream = AsyncStream.empty()
-        assert AwaitableAdapterRegistry.toAsyncStream(stream).is(stream)
+        assert AsyncStream.from(stream).is(stream)
     }
 
     @Test
     void testToAwaitableUnsupportedType() {
         try {
-            AwaitableAdapterRegistry.toAwaitable(new Object())
+            Awaitable.from(new Object())
             assert false
         } catch (IllegalArgumentException e) {
             assert e.message.contains('No AwaitableAdapter found')
@@ -1196,7 +1196,7 @@ class AsyncApiTest {
     @Test
     void testToAsyncStreamUnsupportedType() {
         try {
-            AwaitableAdapterRegistry.toAsyncStream(42)
+            AsyncStream.from(42)
             assert false
         } catch (IllegalArgumentException e) {
             assert e.message.contains('No AsyncStream adapter')
@@ -1206,7 +1206,7 @@ class AsyncApiTest {
     @Test
     void testToAwaitableNullThrows() {
         def ex = shouldFail(IllegalArgumentException) {
-            AwaitableAdapterRegistry.toAwaitable(null)
+            Awaitable.from(null)
         }
         assert ex.message.contains('Cannot convert null to Awaitable')
     }
@@ -1214,7 +1214,7 @@ class AsyncApiTest {
     @Test
     void testToAwaitableUnknownTypeThrows() {
         def ex = shouldFail(IllegalArgumentException) {
-            AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test"))
+            Awaitable.from(new StringBuilder("test"))
         }
         assert ex.message.contains('No AwaitableAdapter found for type')
         assert ex.message.contains('StringBuilder')
@@ -1223,7 +1223,7 @@ class AsyncApiTest {
     @Test
     void testToAsyncStreamNullThrows() {
         def ex = shouldFail(IllegalArgumentException) {
-            AwaitableAdapterRegistry.toAsyncStream(null)
+            AsyncStream.from(null)
         }
         assert ex.message.contains('Cannot convert null to AsyncStream')
     }
@@ -1231,7 +1231,7 @@ class AsyncApiTest {
     @Test
     void testToAsyncStreamUnknownTypeThrows() {
         def ex = shouldFail(IllegalArgumentException) {
-            AwaitableAdapterRegistry.toAsyncStream(new StringBuilder("test"))
+            AsyncStream.from(new StringBuilder("test"))
         }
         assert ex.message.contains('No AsyncStream adapter found for type')
         assert ex.message.contains('StringBuilder')
@@ -1240,7 +1240,7 @@ class AsyncApiTest {
     @Test
     void testToAwaitablePassthrough() {
         Awaitable<String> aw = Awaitable.of('pass')
-        assert AwaitableAdapterRegistry.toAwaitable(aw).is(aw)
+        assert Awaitable.from(aw).is(aw)
     }
 
     @Test
@@ -1254,6 +1254,144 @@ class AsyncApiTest {
         assert !AwaitableAdapterRegistry.unregister(adapter) // already removed
     }
 
+    // ================================================================
+    // Awaitable.from() and AsyncStream.from()
+    // ================================================================
+
+    @Test
+    void testAwaitableFromCompletableFuture() {
+        def cf = CompletableFuture.completedFuture("hello")
+        Awaitable<String> aw = Awaitable.from(cf)
+        assert aw.get() == "hello"
+    }
+
+    @Test
+    void testAwaitableFromCompletionStage() {
+        CompletionStage<String> stage = 
CompletableFuture.completedFuture("stage-value")
+        Awaitable<String> aw = Awaitable.from(stage)
+        assert aw.get() == "stage-value"
+    }
+
+    @Test
+    void testAwaitableFromFuture() {
+        def ft = new FutureTask<>({ -> "future-result" } as Callable)
+        ft.run()
+        Awaitable<String> aw = Awaitable.from(ft)
+        assert aw.get() == "future-result"
+    }
+
+    @Test
+    void testAwaitableFromPassthrough() {
+        Awaitable<String> original = Awaitable.of("original")
+        assert Awaitable.from(original).is(original)
+    }
+
+    @Test
+    void testAwaitableFromNull() {
+        def ex = shouldFail(IllegalArgumentException) {
+            Awaitable.from(null)
+        }
+        assert ex.message.contains('null')
+    }
+
+    @Test
+    void testAwaitableFromUnsupportedType() {
+        def ex = shouldFail(IllegalArgumentException) {
+            Awaitable.from(new Object())
+        }
+        assert ex.message.contains('No AwaitableAdapter found')
+    }
+
+    @Test
+    void testAwaitableFromFailedFuture() {
+        def cf = CompletableFuture.<String>failedFuture(new 
IOException("broken"))
+        Awaitable<String> aw = Awaitable.from(cf)
+        assert aw.isCompletedExceptionally()
+        try {
+            aw.get()
+            assert false
+        } catch (ExecutionException e) {
+            assert e.cause instanceof IOException
+            assert e.cause.message == "broken"
+        }
+    }
+
+    @Test
+    void testAsyncStreamFromIterable() {
+        AsyncStream<String> stream = AsyncStream.from(["x", "y", "z"])
+        def items = []
+        while (stream.moveNext().get()) {
+            items << stream.current
+        }
+        assert items == ["x", "y", "z"]
+    }
+
+    @Test
+    void testAsyncStreamFromIterator() {
+        def iter = ["a", "b"].iterator()
+        AsyncStream<String> stream = AsyncStream.from(iter)
+        assert stream.moveNext().get() == true
+        assert stream.current == "a"
+        assert stream.moveNext().get() == true
+        assert stream.current == "b"
+        assert stream.moveNext().get() == false
+    }
+
+    @Test
+    void testAsyncStreamFromPassthrough() {
+        AsyncStream<String> original = AsyncStream.empty()
+        assert AsyncStream.from(original).is(original)
+    }
+
+    @Test
+    void testAsyncStreamFromNull() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncStream.from(null)
+        }
+        assert ex.message.contains('null')
+    }
+
+    @Test
+    void testAsyncStreamFromUnsupportedType() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncStream.from(42)
+        }
+        assert ex.message.contains('No AsyncStream adapter')
+    }
+
+    @Test
+    void testAsyncStreamFromEmptyIterable() {
+        AsyncStream<String> stream = AsyncStream.from([])
+        assert stream.moveNext().get() == false
+    }
+
+    @Test
+    void testAsyncStreamFromFlowPublisher() {
+        def pub = new SubmissionPublisher<String>()
+        Thread.start {
+            Thread.sleep(50)
+            pub.submit("item1")
+            pub.submit("item2")
+            pub.close()
+        }
+        AsyncStream<String> stream = AsyncStream.from(pub)
+        def items = []
+        while (stream.moveNext().get(2, TimeUnit.SECONDS)) {
+            items << stream.current
+        }
+        stream.close()
+        assert items == ["item1", "item2"]
+    }
+
+    @Test
+    void testAwaitableFromFlowPublisher() {
+        def pub = new SubmissionPublisher<String>()
+        Awaitable<String> aw = Awaitable.from(pub)
+        pub.submit("first")
+        pub.close()
+        assert aw.get(2, TimeUnit.SECONDS) == "first"
+    }
+
     // ================================================================
     // AwaitableAdapter default methods
     // ================================================================
@@ -1288,7 +1426,7 @@ class AsyncApiTest {
     @Test
     void testFlowPublisherToAwaitableOnNext() {
         def pub = new SubmissionPublisher<String>()
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+        Awaitable<String> aw = Awaitable.from(pub)
         pub.submit('hello')
         pub.close()
         assert aw.get() == 'hello'
@@ -1305,7 +1443,7 @@ class AsyncApiTest {
                 s.onError(new IOException('pub-error'))
             }
         }
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+        Awaitable<String> aw = Awaitable.from(pub)
         try {
             aw.get()
             assert false
@@ -1326,14 +1464,14 @@ class AsyncApiTest {
                 s.onComplete()
             }
         }
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+        Awaitable<String> aw = Awaitable.from(pub)
         assert aw.get() == null
     }
 
     @Test
     void testFlowPublisherToAsyncStreamDirect() {
         def pub = new SubmissionPublisher<String>()
-        AsyncStream<String> stream = 
AwaitableAdapterRegistry.toAsyncStream(pub)
+        AsyncStream<String> stream = AsyncStream.from(pub)
         Thread.start {
             Thread.sleep(50)
             pub.submit('a')
@@ -1362,7 +1500,7 @@ class AsyncApiTest {
                 })
             }
         }
-        AsyncStream<String> stream = 
AwaitableAdapterRegistry.toAsyncStream(pub)
+        AsyncStream<String> stream = AsyncStream.from(pub)
         assert stream.moveNext().get() == true
         assert stream.getCurrent() == 'item1'
         try {
@@ -1376,7 +1514,7 @@ class AsyncApiTest {
     @Test
     void testFlowPublisherOnCompleteNoItems() {
         def pub = new SubmissionPublisher<String>()
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+        Awaitable<String> aw = Awaitable.from(pub)
         // Close immediately — triggers onComplete with no onNext
         pub.close()
         assert aw.get() == null
@@ -1385,7 +1523,7 @@ class AsyncApiTest {
     @Test
     void testFlowPublisherOnError() {
         def pub = new SubmissionPublisher<String>()
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+        Awaitable<String> aw = Awaitable.from(pub)
         Thread.start {
             Thread.sleep(50)
             pub.closeExceptionally(new IOException('pub-err'))
@@ -1402,7 +1540,7 @@ class AsyncApiTest {
     @Test
     void testFlowPublisherAsyncStreamMoveNextInterrupted() {
         def pub = new SubmissionPublisher<String>()
-        AsyncStream<String> stream = 
AwaitableAdapterRegistry.toAsyncStream(pub)
+        AsyncStream<String> stream = AsyncStream.from(pub)
         // Interrupt before moveNext to trigger InterruptedException in 
queue.take()
         Thread.currentThread().interrupt()
         try {
@@ -1424,7 +1562,7 @@ class AsyncApiTest {
     @Test
     void testFlowPublisherAsyncStreamOnError() {
         def pub = new SubmissionPublisher<String>()
-        AsyncStream<String> stream = 
AwaitableAdapterRegistry.toAsyncStream(pub)
+        AsyncStream<String> stream = AsyncStream.from(pub)
         Thread.start {
             Thread.sleep(50)
             pub.closeExceptionally(new IOException('stream-err'))
@@ -1440,7 +1578,7 @@ class AsyncApiTest {
     @Test
     void testFlowPublisherAsyncStreamOnComplete() {
         def pub = new SubmissionPublisher<String>()
-        AsyncStream<String> stream = 
AwaitableAdapterRegistry.toAsyncStream(pub)
+        AsyncStream<String> stream = AsyncStream.from(pub)
         Thread.start {
             Thread.sleep(50)
             pub.submit('item1')
@@ -1460,14 +1598,14 @@ class AsyncApiTest {
     void testFutureAdapterAlreadyDone() {
         def ft = new FutureTask<String>({ 'already-done' } as Callable<String>)
         ft.run()
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+        Awaitable<String> aw = Awaitable.from(ft)
         assert aw.get() == 'already-done'
     }
 
     @Test
     void testFutureAdapterNotYetDone() {
         def ft = new FutureTask<String>({ 'async-done' } as Callable<String>)
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+        Awaitable<String> aw = Awaitable.from(ft)
         Thread.start { Thread.sleep(50); ft.run() }
         assert aw.get() == 'async-done'
     }
@@ -1476,7 +1614,7 @@ class AsyncApiTest {
     void testFutureAdapterWithException() {
         def ft = new FutureTask<String>({ throw new 
ArithmeticException('div-zero') } as Callable<String>)
         ft.run()
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+        Awaitable<String> aw = Awaitable.from(ft)
         try {
             aw.get()
             assert false
@@ -1491,7 +1629,7 @@ class AsyncApiTest {
         def ft = new FutureTask<String>({ 'with-executor' } as 
Callable<String>)
         
AwaitableAdapterRegistry.setBlockingExecutor(AsyncSupport.getExecutor())
         try {
-            Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+            Awaitable<String> aw = Awaitable.from(ft)
             Thread.start { Thread.sleep(50); ft.run() }
             assert aw.get() == 'with-executor'
         } finally {
@@ -1503,7 +1641,7 @@ class AsyncApiTest {
     void testFutureAdapterCompleteFromException() {
         def ft = new FutureTask<String>({ throw new IOException('ft-err') } as 
Callable<String>)
         ft.run()
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+        Awaitable<String> aw = Awaitable.from(ft)
         try {
             aw.get()
             assert false
@@ -1516,7 +1654,7 @@ class AsyncApiTest {
     @Test
     void testFutureAdapterNotDone() {
         def ft = new FutureTask<String>({ 'delayed' } as Callable<String>)
-        Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+        Awaitable<String> aw = Awaitable.from(ft)
         // Run the FutureTask after adapter wraps it
         Thread.start {
             Thread.sleep(50)
@@ -2034,4 +2172,250 @@ class AsyncApiTest {
         latch.await(5, TimeUnit.SECONDS)
         assert interrupted.get() : "Producer should be interrupted when 
attached to a closed stream"
     }
+
+    // ================================================================
+    // AwaitResult: additional edge cases
+    // ================================================================
+
+    @Test
+    void testAwaitResultSuccessWithNull() {
+        def r = AwaitResult.success(null)
+        assert r.isSuccess()
+        assert r.value == null
+        assert r.getOrElse({ 'fallback' }) == null
+        assert r.toString() == 'AwaitResult.Success[null]'
+    }
+
+    @Test
+    void testAwaitResultFailureNullThrows() {
+        shouldFail(NullPointerException) {
+            AwaitResult.failure(null)
+        }
+    }
+
+    @Test
+    void testAwaitResultToStringFormats() {
+        assert AwaitResult.success(42).toString() == 'AwaitResult.Success[42]'
+        assert AwaitResult.failure(new 
RuntimeException('oops')).toString().contains('Failure')
+        assert AwaitResult.failure(new 
RuntimeException('oops')).toString().contains('oops')
+    }
+
+    // ================================================================
+    // Awaitable: thenAccept default method
+    // ================================================================
+
+    @Test
+    void testAwaitableThenAcceptSuccess() {
+        def captured = []
+        def aw = Awaitable.of('hello').thenAccept { captured << it }
+        aw.get()
+        assert captured == ['hello']
+    }
+
+    @Test
+    void testAwaitableThenAcceptReturnsVoid() {
+        def aw = Awaitable.of('hello').thenAccept {}
+        assert aw.get() == null
+    }
+
+    // ================================================================
+    // Awaitable: delay with TimeUnit
+    // ================================================================
+
+    @Test
+    void testAwaitableDelayWithTimeUnit() {
+        def start = System.currentTimeMillis()
+        def aw = Awaitable.delay(50, TimeUnit.MILLISECONDS)
+        aw.get()
+        assert System.currentTimeMillis() - start >= 40
+    }
+
+    // ================================================================
+    // Awaitable: static timeout combinators
+    // ================================================================
+
+    @Test
+    void testAwaitableOrTimeoutMillisStaticSuccess() {
+        def cf = CompletableFuture.completedFuture('fast')
+        def aw = Awaitable.orTimeoutMillis(cf, 5000)
+        assert aw.get() == 'fast'
+    }
+
+    @Test
+    void testAwaitableOrTimeoutStaticExpires() {
+        def cf = new CompletableFuture<String>()
+        def aw = Awaitable.orTimeout(cf, 50, TimeUnit.MILLISECONDS)
+        def ex = shouldFail(ExecutionException) {
+            aw.get()
+        }
+        assert ex.cause instanceof java.util.concurrent.TimeoutException
+    }
+
+    @Test
+    void testAwaitableCompleteOnTimeoutMillisStaticSuccess() {
+        def cf = CompletableFuture.completedFuture('fast')
+        def aw = Awaitable.completeOnTimeoutMillis(cf, 'fallback', 5000)
+        assert aw.get() == 'fast'
+    }
+
+    @Test
+    void testAwaitableCompleteOnTimeoutStaticExpires() {
+        def cf = new CompletableFuture<String>()
+        def aw = Awaitable.completeOnTimeout(cf, 'default', 50, 
TimeUnit.MILLISECONDS)
+        assert aw.get() == 'default'
+    }
+
+    // ================================================================
+    // Awaitable: executor configuration
+    // ================================================================
+
+    @Test
+    void testAwaitableGetSetExecutor() {
+        def original = Awaitable.getExecutor()
+        assert original != null
+        try {
+            def custom = 
java.util.concurrent.Executors.newSingleThreadExecutor()
+            Awaitable.setExecutor(custom)
+            assert Awaitable.getExecutor().is(custom)
+            custom.shutdown()
+        } finally {
+            Awaitable.setExecutor(null) // reset to default
+        }
+        assert Awaitable.getExecutor() != null
+    }
+
+    @Test
+    void testAwaitableIsVirtualThreadsAvailable() {
+        // Just verify it returns a boolean and doesn't throw
+        def result = Awaitable.isVirtualThreadsAvailable()
+        assert result instanceof Boolean
+    }
+
+    // ================================================================
+    // AsyncStream.empty()
+    // ================================================================
+
+    @Test
+    void testAsyncStreamEmpty() {
+        def stream = AsyncStream.empty()
+        assert stream.moveNext().get() == false
+        assert stream.current == null
+    }
+
+    @Test
+    void testAsyncStreamEmptySingleton() {
+        assert AsyncStream.empty().is(AsyncStream.empty())
+    }
+
+    @Test
+    void testAsyncStreamCloseDefaultNoOp() {
+        def stream = AsyncStream.empty()
+        stream.close() // Should not throw
+    }
+
+    // ================================================================
+    // Awaitable.from() with custom adapter
+    // ================================================================
+
+    @Test
+    void testAwaitableFromWithCustomAdapter() {
+        def adapter = new AwaitableAdapter() {
+            boolean supportsAwaitable(Class<?> type) { type == StringBuilder }
+            def <T> Awaitable<T> toAwaitable(Object source) {
+                Awaitable.of((T) source.toString())
+            }
+        }
+        def handle = AwaitableAdapterRegistry.register(adapter)
+        try {
+            Awaitable<String> aw = Awaitable.from(new StringBuilder("custom"))
+            assert aw.get() == "custom"
+        } finally {
+            handle.close()
+        }
+    }
+
+    @Test
+    void testAsyncStreamFromWithCustomAdapter() {
+        def adapter = new AwaitableAdapter() {
+            boolean supportsAwaitable(Class<?> type) { false }
+            def <T> Awaitable<T> toAwaitable(Object source) { null }
+            boolean supportsAsyncStream(Class<?> type) { type == StringBuilder 
}
+            def <T> AsyncStream<T> toAsyncStream(Object source) {
+                AsyncStream.from(source.toString().toList())
+            }
+        }
+        def handle = AwaitableAdapterRegistry.register(adapter)
+        try {
+            AsyncStream<String> stream = AsyncStream.from(new 
StringBuilder("ab"))
+            def items = []
+            while (stream.moveNext().get()) {
+                items << stream.current
+            }
+            assert items == ["a", "b"]
+        } finally {
+            handle.close()
+        }
+    }
+
+    // ================================================================
+    // Awaitable.all/any/allSettled: edge cases
+    // ================================================================
+
+    @Test
+    void testAwaitableAllEmpty() {
+        def aw = Awaitable.all()
+        assert aw.get() == []
+    }
+
+    @Test
+    void testAwaitableAllSingleItem() {
+        def aw = Awaitable.all(Awaitable.of(42))
+        assert aw.get() == [42]
+    }
+
+    @Test
+    void testAwaitableAllSettledAllSuccess() {
+        def results = Awaitable.allSettled(Awaitable.of('a'), 
Awaitable.of('b')).get()
+        assert results.every { it.isSuccess() }
+        assert results*.value == ['a', 'b']
+    }
+
+    @Test
+    void testAwaitableAllSettledAllFailure() {
+        def results = Awaitable.allSettled(
+            Awaitable.failed(new IOException('e1')),
+            Awaitable.failed(new RuntimeException('e2'))
+        ).get()
+        assert results.every { it.isFailure() }
+        assert results[0].error instanceof IOException
+        assert results[1].error instanceof RuntimeException
+    }
+
+    // ================================================================
+    // GroovyPromise: exceptionally unwraps CompletionException
+    // ================================================================
+
+    @Test
+    void testGroovyPromiseExceptionallyUnwrapsCompletionException() {
+        def cf = new CompletableFuture<String>()
+        cf.completeExceptionally(new CompletionException(new 
IOException('inner')))
+        def promise = GroovyPromise.of(cf)
+        def recovered = promise.exceptionally { t ->
+            assert t instanceof IOException
+            "recovered: ${t.message}"
+        }
+        assert recovered.get() == 'recovered: inner'
+    }
+
+    // ================================================================
+    // Awaitable.from() with CompletionStage not backed by CompletableFuture
+    // ================================================================
+
+    @Test
+    void testAwaitableFromCompletionStageMinimal() {
+        CompletionStage<String> stage = CompletableFuture.supplyAsync { 
"async-value" }
+                .thenApply { it.toUpperCase() }
+        Awaitable<String> aw = Awaitable.from(stage)
+        assert aw.get(2, TimeUnit.SECONDS) == "ASYNC-VALUE"
+    }
 }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
index eb33abcf55..0919b5736b 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
@@ -32,6 +32,12 @@ import static groovy.test.GroovyAssert.assertScript
  *
  * <p>{@code Flow.Publisher} instances are automatically adapted by the 
built-in
  * adapter, enabling seamless use with {@code await} and {@code for await}.
+ *
+ * <p><b>Test synchronisation:</b> Flow.Publisher tests use
+ * {@code SubmissionPublisher.getNumberOfSubscribers()} to wait until the
+ * subscription handshake is complete before submitting items.  This
+ * eliminates the race between subscription establishment and item
+ * delivery that caused intermittent failures with hard-coded delays.
  */
 class AsyncDeferFlowTest {
 
@@ -239,13 +245,29 @@ class AsyncDeferFlowTest {
     void testAwaitFlowPublisher() {
         assertScript '''
             import java.util.concurrent.SubmissionPublisher
+            import java.util.concurrent.CountDownLatch
 
             def publisher = new SubmissionPublisher<String>()
+            // Ensure items are submitted after subscription is fully 
established
+            def subscribed = new CountDownLatch(1)
             Thread.start {
+                subscribed.await()
                 publisher.submit('hello')
                 publisher.close()
             }
-            def result = await(publisher)
+            // SubmissionPublisher.subscribe() is synchronous — once await() 
internally
+            // calls subscribe(), we can safely signal the publisher thread.
+            // We poll getNumberOfSubscribers() to wait for the subscription 
handshake.
+            def task = async {
+                // The subscribe call happens inside await() via 
FlowPublisherAdapter
+                def result = await(publisher)
+                result
+            }
+            def future = task()
+            // Wait until the subscriber is registered with the publisher
+            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            subscribed.countDown()
+            def result = await(future)
             assert result == 'hello'
         '''
     }
@@ -265,10 +287,10 @@ class AsyncDeferFlowTest {
 
             def publisher = new 
java.util.concurrent.SubmissionPublisher<Integer>()
             def future = new FlowTest().consumePublisher(publisher)
-            Thread.start {
-                (1..5).each { publisher.submit(it) }
-                publisher.close()
-            }
+            // Wait until the for-await loop has subscribed to the publisher
+            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            (1..5).each { publisher.submit(it) }
+            publisher.close()
             def result = await(future)
             assert result == [1, 2, 3, 4, 5]
         '''
@@ -289,12 +311,11 @@ class AsyncDeferFlowTest {
 
             def publisher = new 
java.util.concurrent.SubmissionPublisher<Integer>()
             def future = new FlowTest().consumeWithError(publisher)
-            Thread.start {
-                Thread.sleep(50)
-                publisher.submit(1)
-                publisher.submit(2)
-                publisher.closeExceptionally(new 
RuntimeException('stream-error'))
-            }
+            // Wait until the for-await loop has subscribed
+            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            publisher.submit(1)
+            publisher.submit(2)
+            publisher.closeExceptionally(new RuntimeException('stream-error'))
             try {
                 await(future)
                 assert false : 'Should have thrown'
@@ -318,11 +339,10 @@ class AsyncDeferFlowTest {
                 results
             }
             def future = task()
-            Thread.start {
-                Thread.sleep(50)
-                ['a', 'b', 'c'].each { publisher.submit(it) }
-                publisher.close()
-            }
+            // Wait until the for-await loop has subscribed
+            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            ['a', 'b', 'c'].each { publisher.submit(it) }
+            publisher.close()
             def result = await(future)
             assert result == ['a', 'b', 'c']
         '''
@@ -347,11 +367,10 @@ class AsyncDeferFlowTest {
 
             def publisher = new 
java.util.concurrent.SubmissionPublisher<Integer>()
             def future = new CombinedTest().processStream(publisher)
-            Thread.start {
-                Thread.sleep(50)
-                (1..3).each { publisher.submit(it) }
-                publisher.close()
-            }
+            // Wait until the for-await loop has subscribed
+            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            (1..3).each { publisher.submit(it) }
+            publisher.close()
             def result = await(future)
             assert result == 6
             assert CombinedTest.log == ['sum=6', 'stream-cleanup']
@@ -364,13 +383,16 @@ class AsyncDeferFlowTest {
             import java.util.concurrent.SubmissionPublisher
 
             def publisher = new SubmissionPublisher<Integer>()
-            Thread.start {
-                Thread.sleep(50)
-                publisher.submit(42)
-                publisher.submit(99)  // second value ignored by await
-                publisher.close()
+            def task = async {
+                await(publisher)
             }
-            def result = await(publisher)
+            def future = task()
+            // Wait until the subscriber is registered
+            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            publisher.submit(42)
+            publisher.submit(99)  // second value ignored by await
+            publisher.close()
+            def result = await(future)
             assert result == 42
         '''
     }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
index 10b73de428..9a6bd1298a 100644
--- 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
+++ 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
@@ -224,7 +224,7 @@ class SpringWebFluxStyleController {
     void testReactorMonoToAwaitableApi() {
         assertScript REACTOR_PREAMBLE + '''
             def mono = Mono.just(42)
-            Awaitable<Integer> awaitable = 
AwaitableAdapterRegistry.toAwaitable(mono)
+            Awaitable<Integer> awaitable = Awaitable.from(mono)
             assert awaitable.get() == 42
             assert awaitable.isDone()
         '''
@@ -234,7 +234,7 @@ class SpringWebFluxStyleController {
     void testReactorMonoAwaitableThen() {
         assertScript REACTOR_PREAMBLE + '''
             def mono = Mono.just(5)
-            Awaitable<Integer> a = AwaitableAdapterRegistry.toAwaitable(mono)
+            Awaitable<Integer> a = Awaitable.from(mono)
             Awaitable<Integer> doubled = a.then { it * 2 }
             assert doubled.get() == 10
         '''
@@ -333,7 +333,7 @@ class SpringWebFluxStyleController {
     void testRxJavaSingleToAwaitableApi() {
         assertScript RXJAVA_PREAMBLE + '''
             def single = Single.just("adapted")
-            Awaitable<String> awaitable = 
AwaitableAdapterRegistry.toAwaitable(single)
+            Awaitable<String> awaitable = Awaitable.from(single)
             assert awaitable.get() == "adapted"
             assert awaitable.isDone()
         '''
@@ -418,7 +418,7 @@ class SpringWebFluxStyleController {
     void testSpringStyleCompletionStageAdapter() {
         assertScript REACTOR_PREAMBLE + '''
             CompletionStage<String> stage = CompletableFuture.supplyAsync { 
"stage-value" }
-            Awaitable<String> awaitable = 
AwaitableAdapterRegistry.toAwaitable(stage)
+            Awaitable<String> awaitable = Awaitable.from(stage)
             assert awaitable.get() == "stage-value"
         '''
     }
@@ -444,7 +444,7 @@ class SpringWebFluxStyleController {
     void testReactorToRxJavaInterop() {
         assertScript REACTOR_PREAMBLE + '''
             def mono = Mono.just("from-reactor")
-            Awaitable<String> awaitable = 
AwaitableAdapterRegistry.toAwaitable(mono)
+            Awaitable<String> awaitable = Awaitable.from(mono)
             CompletableFuture<String> cf = awaitable.toCompletableFuture()
             assert cf.get() == "from-reactor"
         '''
@@ -454,7 +454,7 @@ class SpringWebFluxStyleController {
     void testAwaitableExceptionallyWithReactor() {
         assertScript REACTOR_PREAMBLE + '''
             def mono = Mono.error(new RuntimeException("fail"))
-            Awaitable<String> awaitable = 
AwaitableAdapterRegistry.toAwaitable(mono)
+            Awaitable<String> awaitable = Awaitable.from(mono)
             Awaitable<String> recovered = awaitable.exceptionally { 
"recovered" }
             assert recovered.get() == "recovered"
         '''
@@ -464,7 +464,7 @@ class SpringWebFluxStyleController {
     void testAwaitableThenComposeAcrossFrameworks() {
         assertScript REACTOR_PREAMBLE + '''
             def mono = Mono.just(5)
-            Awaitable<Integer> a = AwaitableAdapterRegistry.toAwaitable(mono)
+            Awaitable<Integer> a = Awaitable.from(mono)
             Awaitable<Integer> composed = a.thenCompose { val ->
                 GroovyPromise.of(CompletableFuture.supplyAsync { val * 10 })
             }
@@ -689,7 +689,7 @@ class SpringWebFluxStyleController {
             // Register and verify it works
             def handle = AwaitableAdapterRegistry.register(adapter)
             try {
-                def result = AwaitableAdapterRegistry.toAwaitable(new 
CustomResult(data: "hello"))
+                def result = Awaitable.from(new CustomResult(data: "hello"))
                 assert await(result) == "hello"
             } finally {
                 handle.close()
@@ -697,7 +697,7 @@ class SpringWebFluxStyleController {
 
             // After close, the adapter should be removed
             try {
-                AwaitableAdapterRegistry.toAwaitable(new CustomResult(data: 
"fail"))
+                Awaitable.from(new CustomResult(data: "fail"))
                 assert false : "Should throw"
             } catch (IllegalArgumentException e) {
                 // expected
@@ -740,7 +740,7 @@ class SpringWebFluxStyleController {
                 def future = new FutureTask<String>({ "from-blocking-future" })
                 pool.submit(future)
 
-                def aw = AwaitableAdapterRegistry.toAwaitable(future)
+                def aw = Awaitable.from(future)
                 assert await(aw) == "from-blocking-future"
             } finally {
                 AwaitableAdapterRegistry.setBlockingExecutor(null)
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
index 58f645672f..e3ab3a87d6 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
@@ -1176,11 +1176,10 @@ class AsyncTransformTest {
     void testAdapterRegistryWithCompletableFuture() {
         assertScript '''
             import groovy.concurrent.Awaitable
-            import groovy.concurrent.AwaitableAdapterRegistry
             import java.util.concurrent.CompletableFuture
 
             def cf = CompletableFuture.completedFuture("adapted")
-            Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(cf)
+            Awaitable<String> a = Awaitable.from(cf)
             assert a.get() == "adapted"
         '''
     }
@@ -1189,10 +1188,9 @@ class AsyncTransformTest {
     void testAdapterRegistryPassthroughAwaitable() {
         assertScript '''
             import groovy.concurrent.Awaitable
-            import groovy.concurrent.AwaitableAdapterRegistry
 
             def original = Awaitable.of(42)
-            def adapted = AwaitableAdapterRegistry.toAwaitable(original)
+            def adapted = Awaitable.from(original)
             assert adapted.is(original)
         '''
     }
@@ -1221,7 +1219,7 @@ class AsyncTransformTest {
             })
 
             def custom = new CustomPromise("custom-value")
-            Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(custom)
+            Awaitable<String> a = Awaitable.from(custom)
             assert a.get() == "custom-value"
         '''
     }
@@ -1448,7 +1446,7 @@ class AsyncTransformTest {
             assert !AwaitableAdapterRegistry.unregister(adapter) // already 
removed
 
             try {
-                AwaitableAdapterRegistry.toAwaitable(new FakePromise(val: "x"))
+                Awaitable.from(new FakePromise(val: "x"))
                 assert false : "should have thrown"
             } catch (IllegalArgumentException expected) {
                 assert expected.message.contains("No AwaitableAdapter")
@@ -1472,7 +1470,7 @@ class AsyncTransformTest {
             handle.close() // unregister via AutoCloseable
 
             try {
-                AwaitableAdapterRegistry.toAwaitable(new Token(id: 1))
+                Awaitable.from(new Token(id: 1))
                 assert false
             } catch (IllegalArgumentException expected) { }
         '''
@@ -1508,7 +1506,7 @@ class AsyncTransformTest {
 
             // Test with Iterator directly
             def iter = [10, 20, 30].iterator()
-            AsyncStream stream = AwaitableAdapterRegistry.toAsyncStream(iter)
+            AsyncStream stream = AsyncStream.from(iter)
             def results = []
             while (await(stream.moveNext())) {
                 results << stream.getCurrent()
@@ -1523,7 +1521,7 @@ class AsyncTransformTest {
             import groovy.concurrent.*
 
             try {
-                AwaitableAdapterRegistry.toAsyncStream("a string")
+                AsyncStream.from("a string")
                 assert false : "should throw"
             } catch (IllegalArgumentException expected) {
                 assert expected.message.contains("No AsyncStream adapter")
@@ -1537,7 +1535,7 @@ class AsyncTransformTest {
             import groovy.concurrent.*
 
             try {
-                AwaitableAdapterRegistry.toAwaitable("plain string")
+                Awaitable.from("plain string")
                 assert false : "should throw"
             } catch (IllegalArgumentException expected) {
                 assert expected.message.contains("No AwaitableAdapter")

Reply via email to