This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381_3 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit e38ef50316dddbf801035634185a5ee3a573c4ec Author: Daniel Sun <[email protected]> AuthorDate: Sat Mar 14 11:45:44 2026 +0900 Minor tweaks --- src/main/java/groovy/concurrent/AsyncStream.java | 30 ++ src/main/java/groovy/concurrent/Awaitable.java | 39 +- .../concurrent/AwaitableAdapterRegistry.java | 14 +- src/main/java/groovy/transform/Async.java | 4 +- .../apache/groovy/runtime/async/AsyncSupport.java | 17 +- .../groovy/runtime/async/FlowPublisherAdapter.java | 53 ++- src/spec/doc/core-async-await.adoc | 33 +- src/spec/test/AsyncAwaitSpecTest.groovy | 38 +- .../codehaus/groovy/transform/AsyncApiTest.groovy | 438 +++++++++++++++++++-- .../groovy/transform/AsyncDeferFlowTest.groovy | 76 ++-- .../transform/AsyncFrameworkIntegrationTest.groovy | 20 +- .../groovy/transform/AsyncTransformTest.groovy | 18 +- 12 files changed, 674 insertions(+), 106 deletions(-) diff --git a/src/main/java/groovy/concurrent/AsyncStream.java b/src/main/java/groovy/concurrent/AsyncStream.java index 26c7abe316..4baa81115a 100644 --- a/src/main/java/groovy/concurrent/AsyncStream.java +++ b/src/main/java/groovy/concurrent/AsyncStream.java @@ -75,6 +75,36 @@ public interface AsyncStream<T> extends AutoCloseable { default void close() { } + /** + * Converts the given source to an {@code AsyncStream}. + * <p> + * If the source is already an {@code AsyncStream}, it is returned as-is. + * Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a + * suitable adapter. Built-in adapters handle {@link Iterable} and + * {@link java.util.Iterator}; the auto-discovered {@code FlowPublisherAdapter} + * handles {@link java.util.concurrent.Flow.Publisher}; third-party frameworks + * can register additional adapters via the registry. + * <p> + * This is the recommended entry point for converting external collection or + * reactive types to {@code AsyncStream}: + * <pre> + * AsyncStream<String> stream = AsyncStream.from(myList) + * AsyncStream<Integer> stream2 = AsyncStream.from(myFlowPublisher) + * </pre> + * + * @param source the source object; must not be {@code null} + * @param <T> the element type + * @return an async stream backed by the source + * @throws IllegalArgumentException if {@code source} is {@code null} + * or no adapter supports the source type + * @see AwaitableAdapterRegistry#toAsyncStream(Object) + * @since 6.0.0 + */ + @SuppressWarnings("unchecked") + static <T> AsyncStream<T> from(Object source) { + return AwaitableAdapterRegistry.toAsyncStream(source); + } + /** * Returns an empty {@code AsyncStream} that completes immediately. */ diff --git a/src/main/java/groovy/concurrent/Awaitable.java b/src/main/java/groovy/concurrent/Awaitable.java index ce41b7f947..d7bf8fb689 100644 --- a/src/main/java/groovy/concurrent/Awaitable.java +++ b/src/main/java/groovy/concurrent/Awaitable.java @@ -54,14 +54,18 @@ import java.util.function.Function; * {@code Promise.allSettled()}</li> * <li>{@link #delay(long) Awaitable.delay(ms)} — like * {@code Task.Delay()} / {@code setTimeout}</li> - * <li>{@link #timeout(Object, long) Awaitable.timeout(task, ms)} — like + * <li>{@link #orTimeoutMillis(Object, long) Awaitable.orTimeoutMillis(task, ms)} — like * Kotlin's {@code withTimeout} or a JavaScript promise raced against a timer</li> - * <li>{@link #timeoutOr(Object, Object, long) Awaitable.timeoutOr(task, fallback, ms)} — + * <li>{@link #completeOnTimeoutMillis(Object, Object, long) + * Awaitable.completeOnTimeoutMillis(task, fallback, ms)} — * like a timeout with fallback/default value</li> * </ul> * <p> - * <b>Static factories:</b> + * <b>Static factories and conversion:</b> * <ul> + * <li>{@link #from(Object) Awaitable.from(source)} — converts any supported + * async type (CompletableFuture, CompletionStage, Future, Flow.Publisher, etc.) + * to an {@code Awaitable}</li> * <li>{@link #of(Object) Awaitable.of(value)} — like * {@code Task.FromResult()} / {@code Promise.resolve()}</li> * <li>{@link #failed(Throwable) Awaitable.failed(error)} — like @@ -297,6 +301,35 @@ public interface Awaitable<T> { // ---- Static factories ---- + /** + * Converts the given source to an {@code Awaitable}. + * <p> + * If the source is already an {@code Awaitable}, it is returned as-is. + * Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a + * suitable adapter. Built-in adapters handle {@link CompletableFuture}, + * {@link java.util.concurrent.CompletionStage}, {@link java.util.concurrent.Future}, + * and {@link java.util.concurrent.Flow.Publisher}; third-party frameworks + * can register additional adapters via the registry. + * <p> + * This is the recommended entry point for converting external async types + * to {@code Awaitable}: + * <pre> + * Awaitable<String> aw = Awaitable.from(someCompletableFuture) + * Awaitable<Integer> aw2 = Awaitable.from(someReactorMono) + * </pre> + * + * @param source the source object; must not be {@code null} + * @param <T> the result type + * @return an awaitable backed by the source + * @throws IllegalArgumentException if {@code source} is {@code null} + * or no adapter supports the source type + * @see AwaitableAdapterRegistry#toAwaitable(Object) + * @since 6.0.0 + */ + static <T> Awaitable<T> from(Object source) { + return AwaitableAdapterRegistry.toAwaitable(source); + } + /** * Returns an already-completed {@code Awaitable} with the given value. * Analogous to C#'s {@code Task.FromResult()} or JavaScript's diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java index a3500a13d3..61398d154d 100644 --- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java +++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java @@ -111,13 +111,18 @@ public class AwaitableAdapterRegistry { /** * Converts the given source to an {@link Awaitable}. * If the source is already an {@code Awaitable}, it is returned as-is. + * <p> + * <b>Tip:</b> user code should generally prefer {@link Awaitable#from(Object)}, + * which delegates to this method but is more discoverable from the + * {@code Awaitable} type itself. * * @param source the source object; must not be {@code null} * @throws IllegalArgumentException if {@code source} is {@code null} * or no adapter supports the source type + * @see Awaitable#from(Object) */ @SuppressWarnings("unchecked") - public static <T> Awaitable<T> toAwaitable(Object source) { + static <T> Awaitable<T> toAwaitable(Object source) { if (source == null) { throw new IllegalArgumentException("Cannot convert null to Awaitable"); } @@ -136,13 +141,18 @@ public class AwaitableAdapterRegistry { /** * Converts the given source to an {@link AsyncStream}. * If the source is already an {@code AsyncStream}, it is returned as-is. + * <p> + * <b>Tip:</b> user code should generally prefer {@link AsyncStream#from(Object)}, + * which delegates to this method but is more discoverable from the + * {@code AsyncStream} type itself. * * @param source the source object; must not be {@code null} * @throws IllegalArgumentException if {@code source} is {@code null} * or no adapter supports the source type + * @see AsyncStream#from(Object) */ @SuppressWarnings("unchecked") - public static <T> AsyncStream<T> toAsyncStream(Object source) { + static <T> AsyncStream<T> toAsyncStream(Object source) { if (source == null) { throw new IllegalArgumentException("Cannot convert null to AsyncStream"); } diff --git a/src/main/java/groovy/transform/Async.java b/src/main/java/groovy/transform/Async.java index 90e9cdca4b..2e09ba2a46 100644 --- a/src/main/java/groovy/transform/Async.java +++ b/src/main/java/groovy/transform/Async.java @@ -107,7 +107,9 @@ import java.lang.annotation.Target; * <ul> * <li>Cannot be applied to abstract methods</li> * <li>Cannot be applied to constructors</li> - * <li>Cannot be applied to methods that already return {@code Awaitable}</li> + * <li>Cannot be applied to methods that already return an async type + * ({@code Awaitable}, {@code AsyncStream}, {@code CompletableFuture}, + * {@code CompletionStage}, or {@code Future})</li> * </ul> * * @see groovy.concurrent.Awaitable diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java index 5d8f4aaaf4..96658f81e8 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java @@ -21,7 +21,6 @@ package org.apache.groovy.runtime.async; import groovy.concurrent.AsyncStream; import groovy.concurrent.AwaitResult; import groovy.concurrent.Awaitable; -import groovy.concurrent.AwaitableAdapterRegistry; import groovy.lang.Closure; import java.lang.invoke.MethodHandle; @@ -64,8 +63,9 @@ import java.util.concurrent.TimeoutException; * <ul> * <li><b>Async execution</b> — {@code executeAsync()}, {@code executeAsyncVoid()}, * and {@code wrapAsync()} run closures on the configured executor</li> - * <li><b>Await</b> — all {@code await()} overloads go through the - * {@link AwaitableAdapterRegistry} so that third-party async types + * <li><b>Await</b> — all {@code await()} overloads use + * {@link groovy.concurrent.Awaitable#from(Object) Awaitable.from()} so + * that third-party async types * (RxJava {@code Single}, Reactor {@code Mono}, etc.) are supported * transparently once an adapter is registered</li> * <li><b>Async generators</b> — {@code generateAsyncStream()} manages the @@ -112,7 +112,6 @@ import java.util.concurrent.TimeoutException; * * @see groovy.concurrent.Awaitable * @see groovy.transform.Async - * @see Awaitable * @since 6.0.0 */ public class AsyncSupport { @@ -267,8 +266,8 @@ public class AsyncSupport { } /** - * Awaits an arbitrary object by adapting it to {@link Awaitable} via the - * {@link AwaitableAdapterRegistry}. This is the fallback overload called + * Awaits an arbitrary object by adapting it to {@link Awaitable} via + * {@link Awaitable#from(Object)}. This is the fallback overload called * by the {@code await} expression when the compile-time type is not one * of the other supported types. Returns {@code null} for a {@code null} * argument. @@ -285,7 +284,7 @@ public class AsyncSupport { if (source instanceof CompletionStage) return await((CompletionStage<T>) source); if (source instanceof Future) return await((Future<T>) source); if (source instanceof Closure) return awaitClosure((Closure<?>) source); - return await(AwaitableAdapterRegistry.<T>toAwaitable(source)); + return await(Awaitable.<T>from(source)); } /** @@ -543,7 +542,7 @@ public class AsyncSupport { if (source instanceof CompletableFuture<?> cf) return cf; if (source instanceof Awaitable<?> a) return a.toCompletableFuture(); if (source instanceof CompletionStage<?> cs) return cs.toCompletableFuture(); - return AwaitableAdapterRegistry.toAwaitable(source).toCompletableFuture(); + return Awaitable.from(source).toCompletableFuture(); } // ---- non-blocking combinators (return Awaitable) -------------------- @@ -813,7 +812,7 @@ public class AsyncSupport { public static <T> AsyncStream<T> toAsyncStream(Object source) { if (source == null) return AsyncStream.empty(); if (source instanceof AsyncStream) return (AsyncStream<T>) source; - return AwaitableAdapterRegistry.toAsyncStream(source); + return AsyncStream.from(source); } /** diff --git a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java index 16f8ddc557..f3d5a7847e 100644 --- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java +++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java @@ -64,9 +64,13 @@ import java.util.concurrent.atomic.AtomicReference; * <ul> * <li>§2.5 — duplicate {@code onSubscribe} cancels the second subscription</li> * <li>§2.13 — {@code null} items in {@code onNext} are rejected immediately</li> - * <li>Terminal signals ({@code onError}/{@code onComplete}) use + * <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use * blocking {@code put()} to guarantee delivery even under queue * contention</li> + * <li>Back-pressure is enforced by requesting exactly one item after + * each consumed element; demand is signalled <em>before</em> the + * consumer's {@code moveNext()} awaitable completes, preventing + * livelock when producer and consumer share the same thread pool</li> * </ul> * * @see groovy.concurrent.AwaitableAdapterRegistry @@ -227,9 +231,16 @@ public class FlowPublisherAdapter implements AwaitableAdapter { * providing a pull-based iteration interface over a push-based source. * * <p>Back-pressure is enforced by requesting exactly one item after - * each consumed element. The internal bounded queue (capacity - * {@value QUEUE_CAPACITY}) absorbs minor timing jitter between - * producer and consumer.</p> + * each consumed element. Demand is signalled <em>before</em> the + * consumer's {@code moveNext()} awaitable completes, so the publisher + * can begin producing the next value while the consumer processes the + * current one — this prevents livelock when producer and consumer + * share the same thread pool.</p> + * + * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY}) + * absorbs minor timing jitter between producer and consumer. All + * signals use blocking {@code put()}, ensuring no items or terminal + * events are silently dropped.</p> * * <p><b>Resource management:</b> When the consumer calls * {@link AsyncStream#close()} (e.g. via {@code break} in a @@ -277,10 +288,16 @@ public class FlowPublisherAdapter implements AwaitableAdapter { return; } if (!closedRef.get()) { - // Use offer() for value signals — if the queue is full (publisher - // misbehaving with respect to demand), the item is dropped rather - // than blocking the publisher thread indefinitely. - queue.offer(new ValueSignal<>(item)); + try { + // Blocking put() guarantees the item reaches the consumer. + // Since demand is capped at 1 (one request(1) per moveNext), + // a well-behaved publisher will never overflow the queue; put() + // still protects against misbehaving publishers by blocking + // rather than silently dropping the value. + queue.put(new ValueSignal<>(item)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } } @@ -326,10 +343,16 @@ public class FlowPublisherAdapter implements AwaitableAdapter { if (signal instanceof ValueSignal) { current = ((ValueSignal<T>) signal).value; - cf.complete(Boolean.TRUE); - // Request the next item — back-pressure: one-at-a-time + // Signal demand for the next item BEFORE completing + // the awaitable, so the publisher can begin producing + // the next value while the consumer processes this one. + // Ordering here is critical: if request(1) were called + // after cf.complete(), the consumer could re-enter + // moveNext() and block in take() before demand was + // signalled, creating a livelock. Flow.Subscription sub = subRef.get(); if (sub != null) sub.request(1); + cf.complete(Boolean.TRUE); } else if (signal instanceof ErrorSignal) { streamClosed.set(true); cf.completeExceptionally(((ErrorSignal) signal).error); @@ -366,10 +389,14 @@ public class FlowPublisherAdapter implements AwaitableAdapter { if (sub != null) sub.cancel(); // Drain the queue and inject a sentinel to unblock a // concurrent moveNext() that may be blocked in take(). - // Using offer() after clear() performs a non-blocking, - // best-effort delivery of the sentinel to any waiter. + // Using blocking put() after clear() guarantees delivery; + // since the queue is freshly cleared, put() will not block. queue.clear(); - queue.offer(COMPLETE_SENTINEL); + try { + queue.put(COMPLETE_SENTINEL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } } }; diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index cb9c88ee2c..fd4c05cdbb 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -610,7 +610,10 @@ RxJava 3 via adapters, Vert.x, etc.). The internal adapter uses a bounded buffer (capacity 256) to bridge the push-based `Publisher` model to Groovy's pull-based `AsyncStream` model, preventing unbounded memory growth with -fast publishers while maintaining throughput. +fast publishers while maintaining throughput. Back-pressure is enforced by requesting exactly +one item at a time; demand for the next element is signalled _before_ the consumer's +`moveNext()` awaitable completes, so the publisher can begin producing the next value while +the consumer processes the current one. === Awaiting a Single Value @@ -638,6 +641,24 @@ This is conceptually similar to JavaScript async iterators' `return()`, C#'s [[adapter-registry]] == Adapter Registry +=== Converting External Types with `from()` + +The `Awaitable.from(source)` and `AsyncStream.from(source)` static methods are the recommended +entry points for converting external async types to Groovy's native abstractions. They delegate +to the `AwaitableAdapterRegistry` internally but provide a more discoverable, user-friendly API: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=from_conversion,indent=0] +---- + +`Awaitable.from()` accepts any type supported by a registered adapter: `CompletableFuture`, +`CompletionStage`, `Future`, `Flow.Publisher`, or any third-party type with a custom adapter. +If the source is already an `Awaitable`, it is returned as-is. `AsyncStream.from()` works +similarly for multi-value sources such as `Iterable`, `Iterator`, or `Flow.Publisher`. + +=== Built-in Adapters + The `groovy.concurrent.AwaitableAdapterRegistry` allows extending `await` to support additional asynchronous types from third-party frameworks. The built-in adapter handles: @@ -959,7 +980,9 @@ writes (adapter registration) are rare, reads (every `await`) are frequent * `AsyncStreamGenerator` close state — `AtomicBoolean` + `AtomicReference<Thread>` for prompt, idempotent close/cancellation signalling * `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — `AtomicReference<Subscription>` with CAS-guarded - onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value path, and close-aware queue signalling + onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value path, and close-aware + queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking `put()` for + guaranteed delivery; demand is signalled before the consumer's awaitable completes to prevent livelock * Defer scopes — per-task `ArrayDeque`, confined to a single thread (no sharing) * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations @@ -1131,7 +1154,8 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | task priority / executor chosen by runtime | **Third-party support** -| `AwaitableAdapterRegistry` SPI +| `AwaitableAdapterRegistry` SPI + +`Awaitable.from()` / `AsyncStream.from()` | _(native `thenable` protocol)_ | Custom awaiters via `GetAwaiter()` | coroutine adapters / bridges @@ -1195,6 +1219,9 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | Pre-computed result | `Awaitable.of(value)` / `Awaitable.failed(error)` +| Type conversion +| `Awaitable.from(source)` / `AsyncStream.from(source)` + | Annotation form | `@Async def methodName() { ... }` diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy b/src/spec/test/AsyncAwaitSpecTest.groovy index fd3636ec80..5040424dae 100644 --- a/src/spec/test/AsyncAwaitSpecTest.groovy +++ b/src/spec/test/AsyncAwaitSpecTest.groovy @@ -1205,7 +1205,43 @@ assert task.isDone() } // ========================================================================= - // 19. Custom adapter registration + // 19. Type conversion with from() + // ========================================================================= + + @Test + void testFromConversion() { + assertScript ''' +// tag::from_conversion[] +import groovy.concurrent.Awaitable +import groovy.concurrent.AsyncStream +import java.util.concurrent.CompletableFuture + +// Convert a CompletableFuture to Awaitable +def cf = CompletableFuture.completedFuture("hello") +Awaitable<String> awaitable = Awaitable.from(cf) +assert awaitable.get() == "hello" + +// If the source is already an Awaitable, it is returned as-is +def original = Awaitable.of(42) +assert Awaitable.from(original).is(original) + +// Convert an Iterable to AsyncStream +AsyncStream<String> stream = AsyncStream.from(["a", "b", "c"]) +def items = [] +assert stream.moveNext().get() == true +items << stream.current +assert stream.moveNext().get() == true +items << stream.current +assert stream.moveNext().get() == true +items << stream.current +assert stream.moveNext().get() == false +assert items == ["a", "b", "c"] +// end::from_conversion[] + ''' + } + + // ========================================================================= + // 20. Custom adapter registration // ========================================================================= @Test diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy index 34886a564d..7dabc83a79 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy @@ -1154,13 +1154,13 @@ class AsyncApiTest { } // ================================================================ - // AwaitableAdapterRegistry + // Awaitable.from / AsyncStream.from / AwaitableAdapterRegistry // ================================================================ @Test void testToAwaitableNull() { try { - AwaitableAdapterRegistry.toAwaitable(null) + Awaitable.from(null) assert false } catch (IllegalArgumentException e) { assert e.message.contains('null') @@ -1170,7 +1170,7 @@ class AsyncApiTest { @Test void testToAsyncStreamNull() { try { - AwaitableAdapterRegistry.toAsyncStream(null) + AsyncStream.from(null) assert false } catch (IllegalArgumentException e) { assert e.message.contains('null') @@ -1180,13 +1180,13 @@ class AsyncApiTest { @Test void testToAsyncStreamPassthrough() { AsyncStream<String> stream = AsyncStream.empty() - assert AwaitableAdapterRegistry.toAsyncStream(stream).is(stream) + assert AsyncStream.from(stream).is(stream) } @Test void testToAwaitableUnsupportedType() { try { - AwaitableAdapterRegistry.toAwaitable(new Object()) + Awaitable.from(new Object()) assert false } catch (IllegalArgumentException e) { assert e.message.contains('No AwaitableAdapter found') @@ -1196,7 +1196,7 @@ class AsyncApiTest { @Test void testToAsyncStreamUnsupportedType() { try { - AwaitableAdapterRegistry.toAsyncStream(42) + AsyncStream.from(42) assert false } catch (IllegalArgumentException e) { assert e.message.contains('No AsyncStream adapter') @@ -1206,7 +1206,7 @@ class AsyncApiTest { @Test void testToAwaitableNullThrows() { def ex = shouldFail(IllegalArgumentException) { - AwaitableAdapterRegistry.toAwaitable(null) + Awaitable.from(null) } assert ex.message.contains('Cannot convert null to Awaitable') } @@ -1214,7 +1214,7 @@ class AsyncApiTest { @Test void testToAwaitableUnknownTypeThrows() { def ex = shouldFail(IllegalArgumentException) { - AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test")) + Awaitable.from(new StringBuilder("test")) } assert ex.message.contains('No AwaitableAdapter found for type') assert ex.message.contains('StringBuilder') @@ -1223,7 +1223,7 @@ class AsyncApiTest { @Test void testToAsyncStreamNullThrows() { def ex = shouldFail(IllegalArgumentException) { - AwaitableAdapterRegistry.toAsyncStream(null) + AsyncStream.from(null) } assert ex.message.contains('Cannot convert null to AsyncStream') } @@ -1231,7 +1231,7 @@ class AsyncApiTest { @Test void testToAsyncStreamUnknownTypeThrows() { def ex = shouldFail(IllegalArgumentException) { - AwaitableAdapterRegistry.toAsyncStream(new StringBuilder("test")) + AsyncStream.from(new StringBuilder("test")) } assert ex.message.contains('No AsyncStream adapter found for type') assert ex.message.contains('StringBuilder') @@ -1240,7 +1240,7 @@ class AsyncApiTest { @Test void testToAwaitablePassthrough() { Awaitable<String> aw = Awaitable.of('pass') - assert AwaitableAdapterRegistry.toAwaitable(aw).is(aw) + assert Awaitable.from(aw).is(aw) } @Test @@ -1254,6 +1254,144 @@ class AsyncApiTest { assert !AwaitableAdapterRegistry.unregister(adapter) // already removed } + // ================================================================ + // Awaitable.from() and AsyncStream.from() + // ================================================================ + + @Test + void testAwaitableFromCompletableFuture() { + def cf = CompletableFuture.completedFuture("hello") + Awaitable<String> aw = Awaitable.from(cf) + assert aw.get() == "hello" + } + + @Test + void testAwaitableFromCompletionStage() { + CompletionStage<String> stage = CompletableFuture.completedFuture("stage-value") + Awaitable<String> aw = Awaitable.from(stage) + assert aw.get() == "stage-value" + } + + @Test + void testAwaitableFromFuture() { + def ft = new FutureTask<>({ -> "future-result" } as Callable) + ft.run() + Awaitable<String> aw = Awaitable.from(ft) + assert aw.get() == "future-result" + } + + @Test + void testAwaitableFromPassthrough() { + Awaitable<String> original = Awaitable.of("original") + assert Awaitable.from(original).is(original) + } + + @Test + void testAwaitableFromNull() { + def ex = shouldFail(IllegalArgumentException) { + Awaitable.from(null) + } + assert ex.message.contains('null') + } + + @Test + void testAwaitableFromUnsupportedType() { + def ex = shouldFail(IllegalArgumentException) { + Awaitable.from(new Object()) + } + assert ex.message.contains('No AwaitableAdapter found') + } + + @Test + void testAwaitableFromFailedFuture() { + def cf = CompletableFuture.<String>failedFuture(new IOException("broken")) + Awaitable<String> aw = Awaitable.from(cf) + assert aw.isCompletedExceptionally() + try { + aw.get() + assert false + } catch (ExecutionException e) { + assert e.cause instanceof IOException + assert e.cause.message == "broken" + } + } + + @Test + void testAsyncStreamFromIterable() { + AsyncStream<String> stream = AsyncStream.from(["x", "y", "z"]) + def items = [] + while (stream.moveNext().get()) { + items << stream.current + } + assert items == ["x", "y", "z"] + } + + @Test + void testAsyncStreamFromIterator() { + def iter = ["a", "b"].iterator() + AsyncStream<String> stream = AsyncStream.from(iter) + assert stream.moveNext().get() == true + assert stream.current == "a" + assert stream.moveNext().get() == true + assert stream.current == "b" + assert stream.moveNext().get() == false + } + + @Test + void testAsyncStreamFromPassthrough() { + AsyncStream<String> original = AsyncStream.empty() + assert AsyncStream.from(original).is(original) + } + + @Test + void testAsyncStreamFromNull() { + def ex = shouldFail(IllegalArgumentException) { + AsyncStream.from(null) + } + assert ex.message.contains('null') + } + + @Test + void testAsyncStreamFromUnsupportedType() { + def ex = shouldFail(IllegalArgumentException) { + AsyncStream.from(42) + } + assert ex.message.contains('No AsyncStream adapter') + } + + @Test + void testAsyncStreamFromEmptyIterable() { + AsyncStream<String> stream = AsyncStream.from([]) + assert stream.moveNext().get() == false + } + + @Test + void testAsyncStreamFromFlowPublisher() { + def pub = new SubmissionPublisher<String>() + Thread.start { + Thread.sleep(50) + pub.submit("item1") + pub.submit("item2") + pub.close() + } + AsyncStream<String> stream = AsyncStream.from(pub) + def items = [] + while (stream.moveNext().get(2, TimeUnit.SECONDS)) { + items << stream.current + } + stream.close() + assert items == ["item1", "item2"] + } + + @Test + void testAwaitableFromFlowPublisher() { + def pub = new SubmissionPublisher<String>() + Awaitable<String> aw = Awaitable.from(pub) + pub.submit("first") + pub.close() + assert aw.get(2, TimeUnit.SECONDS) == "first" + } + // ================================================================ // AwaitableAdapter default methods // ================================================================ @@ -1288,7 +1426,7 @@ class AsyncApiTest { @Test void testFlowPublisherToAwaitableOnNext() { def pub = new SubmissionPublisher<String>() - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub) + Awaitable<String> aw = Awaitable.from(pub) pub.submit('hello') pub.close() assert aw.get() == 'hello' @@ -1305,7 +1443,7 @@ class AsyncApiTest { s.onError(new IOException('pub-error')) } } - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub) + Awaitable<String> aw = Awaitable.from(pub) try { aw.get() assert false @@ -1326,14 +1464,14 @@ class AsyncApiTest { s.onComplete() } } - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub) + Awaitable<String> aw = Awaitable.from(pub) assert aw.get() == null } @Test void testFlowPublisherToAsyncStreamDirect() { def pub = new SubmissionPublisher<String>() - AsyncStream<String> stream = AwaitableAdapterRegistry.toAsyncStream(pub) + AsyncStream<String> stream = AsyncStream.from(pub) Thread.start { Thread.sleep(50) pub.submit('a') @@ -1362,7 +1500,7 @@ class AsyncApiTest { }) } } - AsyncStream<String> stream = AwaitableAdapterRegistry.toAsyncStream(pub) + AsyncStream<String> stream = AsyncStream.from(pub) assert stream.moveNext().get() == true assert stream.getCurrent() == 'item1' try { @@ -1376,7 +1514,7 @@ class AsyncApiTest { @Test void testFlowPublisherOnCompleteNoItems() { def pub = new SubmissionPublisher<String>() - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub) + Awaitable<String> aw = Awaitable.from(pub) // Close immediately — triggers onComplete with no onNext pub.close() assert aw.get() == null @@ -1385,7 +1523,7 @@ class AsyncApiTest { @Test void testFlowPublisherOnError() { def pub = new SubmissionPublisher<String>() - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub) + Awaitable<String> aw = Awaitable.from(pub) Thread.start { Thread.sleep(50) pub.closeExceptionally(new IOException('pub-err')) @@ -1402,7 +1540,7 @@ class AsyncApiTest { @Test void testFlowPublisherAsyncStreamMoveNextInterrupted() { def pub = new SubmissionPublisher<String>() - AsyncStream<String> stream = AwaitableAdapterRegistry.toAsyncStream(pub) + AsyncStream<String> stream = AsyncStream.from(pub) // Interrupt before moveNext to trigger InterruptedException in queue.take() Thread.currentThread().interrupt() try { @@ -1424,7 +1562,7 @@ class AsyncApiTest { @Test void testFlowPublisherAsyncStreamOnError() { def pub = new SubmissionPublisher<String>() - AsyncStream<String> stream = AwaitableAdapterRegistry.toAsyncStream(pub) + AsyncStream<String> stream = AsyncStream.from(pub) Thread.start { Thread.sleep(50) pub.closeExceptionally(new IOException('stream-err')) @@ -1440,7 +1578,7 @@ class AsyncApiTest { @Test void testFlowPublisherAsyncStreamOnComplete() { def pub = new SubmissionPublisher<String>() - AsyncStream<String> stream = AwaitableAdapterRegistry.toAsyncStream(pub) + AsyncStream<String> stream = AsyncStream.from(pub) Thread.start { Thread.sleep(50) pub.submit('item1') @@ -1460,14 +1598,14 @@ class AsyncApiTest { void testFutureAdapterAlreadyDone() { def ft = new FutureTask<String>({ 'already-done' } as Callable<String>) ft.run() - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft) + Awaitable<String> aw = Awaitable.from(ft) assert aw.get() == 'already-done' } @Test void testFutureAdapterNotYetDone() { def ft = new FutureTask<String>({ 'async-done' } as Callable<String>) - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft) + Awaitable<String> aw = Awaitable.from(ft) Thread.start { Thread.sleep(50); ft.run() } assert aw.get() == 'async-done' } @@ -1476,7 +1614,7 @@ class AsyncApiTest { void testFutureAdapterWithException() { def ft = new FutureTask<String>({ throw new ArithmeticException('div-zero') } as Callable<String>) ft.run() - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft) + Awaitable<String> aw = Awaitable.from(ft) try { aw.get() assert false @@ -1491,7 +1629,7 @@ class AsyncApiTest { def ft = new FutureTask<String>({ 'with-executor' } as Callable<String>) AwaitableAdapterRegistry.setBlockingExecutor(AsyncSupport.getExecutor()) try { - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft) + Awaitable<String> aw = Awaitable.from(ft) Thread.start { Thread.sleep(50); ft.run() } assert aw.get() == 'with-executor' } finally { @@ -1503,7 +1641,7 @@ class AsyncApiTest { void testFutureAdapterCompleteFromException() { def ft = new FutureTask<String>({ throw new IOException('ft-err') } as Callable<String>) ft.run() - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft) + Awaitable<String> aw = Awaitable.from(ft) try { aw.get() assert false @@ -1516,7 +1654,7 @@ class AsyncApiTest { @Test void testFutureAdapterNotDone() { def ft = new FutureTask<String>({ 'delayed' } as Callable<String>) - Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft) + Awaitable<String> aw = Awaitable.from(ft) // Run the FutureTask after adapter wraps it Thread.start { Thread.sleep(50) @@ -2034,4 +2172,250 @@ class AsyncApiTest { latch.await(5, TimeUnit.SECONDS) assert interrupted.get() : "Producer should be interrupted when attached to a closed stream" } + + // ================================================================ + // AwaitResult: additional edge cases + // ================================================================ + + @Test + void testAwaitResultSuccessWithNull() { + def r = AwaitResult.success(null) + assert r.isSuccess() + assert r.value == null + assert r.getOrElse({ 'fallback' }) == null + assert r.toString() == 'AwaitResult.Success[null]' + } + + @Test + void testAwaitResultFailureNullThrows() { + shouldFail(NullPointerException) { + AwaitResult.failure(null) + } + } + + @Test + void testAwaitResultToStringFormats() { + assert AwaitResult.success(42).toString() == 'AwaitResult.Success[42]' + assert AwaitResult.failure(new RuntimeException('oops')).toString().contains('Failure') + assert AwaitResult.failure(new RuntimeException('oops')).toString().contains('oops') + } + + // ================================================================ + // Awaitable: thenAccept default method + // ================================================================ + + @Test + void testAwaitableThenAcceptSuccess() { + def captured = [] + def aw = Awaitable.of('hello').thenAccept { captured << it } + aw.get() + assert captured == ['hello'] + } + + @Test + void testAwaitableThenAcceptReturnsVoid() { + def aw = Awaitable.of('hello').thenAccept {} + assert aw.get() == null + } + + // ================================================================ + // Awaitable: delay with TimeUnit + // ================================================================ + + @Test + void testAwaitableDelayWithTimeUnit() { + def start = System.currentTimeMillis() + def aw = Awaitable.delay(50, TimeUnit.MILLISECONDS) + aw.get() + assert System.currentTimeMillis() - start >= 40 + } + + // ================================================================ + // Awaitable: static timeout combinators + // ================================================================ + + @Test + void testAwaitableOrTimeoutMillisStaticSuccess() { + def cf = CompletableFuture.completedFuture('fast') + def aw = Awaitable.orTimeoutMillis(cf, 5000) + assert aw.get() == 'fast' + } + + @Test + void testAwaitableOrTimeoutStaticExpires() { + def cf = new CompletableFuture<String>() + def aw = Awaitable.orTimeout(cf, 50, TimeUnit.MILLISECONDS) + def ex = shouldFail(ExecutionException) { + aw.get() + } + assert ex.cause instanceof java.util.concurrent.TimeoutException + } + + @Test + void testAwaitableCompleteOnTimeoutMillisStaticSuccess() { + def cf = CompletableFuture.completedFuture('fast') + def aw = Awaitable.completeOnTimeoutMillis(cf, 'fallback', 5000) + assert aw.get() == 'fast' + } + + @Test + void testAwaitableCompleteOnTimeoutStaticExpires() { + def cf = new CompletableFuture<String>() + def aw = Awaitable.completeOnTimeout(cf, 'default', 50, TimeUnit.MILLISECONDS) + assert aw.get() == 'default' + } + + // ================================================================ + // Awaitable: executor configuration + // ================================================================ + + @Test + void testAwaitableGetSetExecutor() { + def original = Awaitable.getExecutor() + assert original != null + try { + def custom = java.util.concurrent.Executors.newSingleThreadExecutor() + Awaitable.setExecutor(custom) + assert Awaitable.getExecutor().is(custom) + custom.shutdown() + } finally { + Awaitable.setExecutor(null) // reset to default + } + assert Awaitable.getExecutor() != null + } + + @Test + void testAwaitableIsVirtualThreadsAvailable() { + // Just verify it returns a boolean and doesn't throw + def result = Awaitable.isVirtualThreadsAvailable() + assert result instanceof Boolean + } + + // ================================================================ + // AsyncStream.empty() + // ================================================================ + + @Test + void testAsyncStreamEmpty() { + def stream = AsyncStream.empty() + assert stream.moveNext().get() == false + assert stream.current == null + } + + @Test + void testAsyncStreamEmptySingleton() { + assert AsyncStream.empty().is(AsyncStream.empty()) + } + + @Test + void testAsyncStreamCloseDefaultNoOp() { + def stream = AsyncStream.empty() + stream.close() // Should not throw + } + + // ================================================================ + // Awaitable.from() with custom adapter + // ================================================================ + + @Test + void testAwaitableFromWithCustomAdapter() { + def adapter = new AwaitableAdapter() { + boolean supportsAwaitable(Class<?> type) { type == StringBuilder } + def <T> Awaitable<T> toAwaitable(Object source) { + Awaitable.of((T) source.toString()) + } + } + def handle = AwaitableAdapterRegistry.register(adapter) + try { + Awaitable<String> aw = Awaitable.from(new StringBuilder("custom")) + assert aw.get() == "custom" + } finally { + handle.close() + } + } + + @Test + void testAsyncStreamFromWithCustomAdapter() { + def adapter = new AwaitableAdapter() { + boolean supportsAwaitable(Class<?> type) { false } + def <T> Awaitable<T> toAwaitable(Object source) { null } + boolean supportsAsyncStream(Class<?> type) { type == StringBuilder } + def <T> AsyncStream<T> toAsyncStream(Object source) { + AsyncStream.from(source.toString().toList()) + } + } + def handle = AwaitableAdapterRegistry.register(adapter) + try { + AsyncStream<String> stream = AsyncStream.from(new StringBuilder("ab")) + def items = [] + while (stream.moveNext().get()) { + items << stream.current + } + assert items == ["a", "b"] + } finally { + handle.close() + } + } + + // ================================================================ + // Awaitable.all/any/allSettled: edge cases + // ================================================================ + + @Test + void testAwaitableAllEmpty() { + def aw = Awaitable.all() + assert aw.get() == [] + } + + @Test + void testAwaitableAllSingleItem() { + def aw = Awaitable.all(Awaitable.of(42)) + assert aw.get() == [42] + } + + @Test + void testAwaitableAllSettledAllSuccess() { + def results = Awaitable.allSettled(Awaitable.of('a'), Awaitable.of('b')).get() + assert results.every { it.isSuccess() } + assert results*.value == ['a', 'b'] + } + + @Test + void testAwaitableAllSettledAllFailure() { + def results = Awaitable.allSettled( + Awaitable.failed(new IOException('e1')), + Awaitable.failed(new RuntimeException('e2')) + ).get() + assert results.every { it.isFailure() } + assert results[0].error instanceof IOException + assert results[1].error instanceof RuntimeException + } + + // ================================================================ + // GroovyPromise: exceptionally unwraps CompletionException + // ================================================================ + + @Test + void testGroovyPromiseExceptionallyUnwrapsCompletionException() { + def cf = new CompletableFuture<String>() + cf.completeExceptionally(new CompletionException(new IOException('inner'))) + def promise = GroovyPromise.of(cf) + def recovered = promise.exceptionally { t -> + assert t instanceof IOException + "recovered: ${t.message}" + } + assert recovered.get() == 'recovered: inner' + } + + // ================================================================ + // Awaitable.from() with CompletionStage not backed by CompletableFuture + // ================================================================ + + @Test + void testAwaitableFromCompletionStageMinimal() { + CompletionStage<String> stage = CompletableFuture.supplyAsync { "async-value" } + .thenApply { it.toUpperCase() } + Awaitable<String> aw = Awaitable.from(stage) + assert aw.get(2, TimeUnit.SECONDS) == "ASYNC-VALUE" + } } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy index eb33abcf55..0919b5736b 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy @@ -32,6 +32,12 @@ import static groovy.test.GroovyAssert.assertScript * * <p>{@code Flow.Publisher} instances are automatically adapted by the built-in * adapter, enabling seamless use with {@code await} and {@code for await}. + * + * <p><b>Test synchronisation:</b> Flow.Publisher tests use + * {@code SubmissionPublisher.getNumberOfSubscribers()} to wait until the + * subscription handshake is complete before submitting items. This + * eliminates the race between subscription establishment and item + * delivery that caused intermittent failures with hard-coded delays. */ class AsyncDeferFlowTest { @@ -239,13 +245,29 @@ class AsyncDeferFlowTest { void testAwaitFlowPublisher() { assertScript ''' import java.util.concurrent.SubmissionPublisher + import java.util.concurrent.CountDownLatch def publisher = new SubmissionPublisher<String>() + // Ensure items are submitted after subscription is fully established + def subscribed = new CountDownLatch(1) Thread.start { + subscribed.await() publisher.submit('hello') publisher.close() } - def result = await(publisher) + // SubmissionPublisher.subscribe() is synchronous — once await() internally + // calls subscribe(), we can safely signal the publisher thread. + // We poll getNumberOfSubscribers() to wait for the subscription handshake. + def task = async { + // The subscribe call happens inside await() via FlowPublisherAdapter + def result = await(publisher) + result + } + def future = task() + // Wait until the subscriber is registered with the publisher + while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } + subscribed.countDown() + def result = await(future) assert result == 'hello' ''' } @@ -265,10 +287,10 @@ class AsyncDeferFlowTest { def publisher = new java.util.concurrent.SubmissionPublisher<Integer>() def future = new FlowTest().consumePublisher(publisher) - Thread.start { - (1..5).each { publisher.submit(it) } - publisher.close() - } + // Wait until the for-await loop has subscribed to the publisher + while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } + (1..5).each { publisher.submit(it) } + publisher.close() def result = await(future) assert result == [1, 2, 3, 4, 5] ''' @@ -289,12 +311,11 @@ class AsyncDeferFlowTest { def publisher = new java.util.concurrent.SubmissionPublisher<Integer>() def future = new FlowTest().consumeWithError(publisher) - Thread.start { - Thread.sleep(50) - publisher.submit(1) - publisher.submit(2) - publisher.closeExceptionally(new RuntimeException('stream-error')) - } + // Wait until the for-await loop has subscribed + while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } + publisher.submit(1) + publisher.submit(2) + publisher.closeExceptionally(new RuntimeException('stream-error')) try { await(future) assert false : 'Should have thrown' @@ -318,11 +339,10 @@ class AsyncDeferFlowTest { results } def future = task() - Thread.start { - Thread.sleep(50) - ['a', 'b', 'c'].each { publisher.submit(it) } - publisher.close() - } + // Wait until the for-await loop has subscribed + while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } + ['a', 'b', 'c'].each { publisher.submit(it) } + publisher.close() def result = await(future) assert result == ['a', 'b', 'c'] ''' @@ -347,11 +367,10 @@ class AsyncDeferFlowTest { def publisher = new java.util.concurrent.SubmissionPublisher<Integer>() def future = new CombinedTest().processStream(publisher) - Thread.start { - Thread.sleep(50) - (1..3).each { publisher.submit(it) } - publisher.close() - } + // Wait until the for-await loop has subscribed + while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } + (1..3).each { publisher.submit(it) } + publisher.close() def result = await(future) assert result == 6 assert CombinedTest.log == ['sum=6', 'stream-cleanup'] @@ -364,13 +383,16 @@ class AsyncDeferFlowTest { import java.util.concurrent.SubmissionPublisher def publisher = new SubmissionPublisher<Integer>() - Thread.start { - Thread.sleep(50) - publisher.submit(42) - publisher.submit(99) // second value ignored by await - publisher.close() + def task = async { + await(publisher) } - def result = await(publisher) + def future = task() + // Wait until the subscriber is registered + while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } + publisher.submit(42) + publisher.submit(99) // second value ignored by await + publisher.close() + def result = await(future) assert result == 42 ''' } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy index 10b73de428..9a6bd1298a 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy @@ -224,7 +224,7 @@ class SpringWebFluxStyleController { void testReactorMonoToAwaitableApi() { assertScript REACTOR_PREAMBLE + ''' def mono = Mono.just(42) - Awaitable<Integer> awaitable = AwaitableAdapterRegistry.toAwaitable(mono) + Awaitable<Integer> awaitable = Awaitable.from(mono) assert awaitable.get() == 42 assert awaitable.isDone() ''' @@ -234,7 +234,7 @@ class SpringWebFluxStyleController { void testReactorMonoAwaitableThen() { assertScript REACTOR_PREAMBLE + ''' def mono = Mono.just(5) - Awaitable<Integer> a = AwaitableAdapterRegistry.toAwaitable(mono) + Awaitable<Integer> a = Awaitable.from(mono) Awaitable<Integer> doubled = a.then { it * 2 } assert doubled.get() == 10 ''' @@ -333,7 +333,7 @@ class SpringWebFluxStyleController { void testRxJavaSingleToAwaitableApi() { assertScript RXJAVA_PREAMBLE + ''' def single = Single.just("adapted") - Awaitable<String> awaitable = AwaitableAdapterRegistry.toAwaitable(single) + Awaitable<String> awaitable = Awaitable.from(single) assert awaitable.get() == "adapted" assert awaitable.isDone() ''' @@ -418,7 +418,7 @@ class SpringWebFluxStyleController { void testSpringStyleCompletionStageAdapter() { assertScript REACTOR_PREAMBLE + ''' CompletionStage<String> stage = CompletableFuture.supplyAsync { "stage-value" } - Awaitable<String> awaitable = AwaitableAdapterRegistry.toAwaitable(stage) + Awaitable<String> awaitable = Awaitable.from(stage) assert awaitable.get() == "stage-value" ''' } @@ -444,7 +444,7 @@ class SpringWebFluxStyleController { void testReactorToRxJavaInterop() { assertScript REACTOR_PREAMBLE + ''' def mono = Mono.just("from-reactor") - Awaitable<String> awaitable = AwaitableAdapterRegistry.toAwaitable(mono) + Awaitable<String> awaitable = Awaitable.from(mono) CompletableFuture<String> cf = awaitable.toCompletableFuture() assert cf.get() == "from-reactor" ''' @@ -454,7 +454,7 @@ class SpringWebFluxStyleController { void testAwaitableExceptionallyWithReactor() { assertScript REACTOR_PREAMBLE + ''' def mono = Mono.error(new RuntimeException("fail")) - Awaitable<String> awaitable = AwaitableAdapterRegistry.toAwaitable(mono) + Awaitable<String> awaitable = Awaitable.from(mono) Awaitable<String> recovered = awaitable.exceptionally { "recovered" } assert recovered.get() == "recovered" ''' @@ -464,7 +464,7 @@ class SpringWebFluxStyleController { void testAwaitableThenComposeAcrossFrameworks() { assertScript REACTOR_PREAMBLE + ''' def mono = Mono.just(5) - Awaitable<Integer> a = AwaitableAdapterRegistry.toAwaitable(mono) + Awaitable<Integer> a = Awaitable.from(mono) Awaitable<Integer> composed = a.thenCompose { val -> GroovyPromise.of(CompletableFuture.supplyAsync { val * 10 }) } @@ -689,7 +689,7 @@ class SpringWebFluxStyleController { // Register and verify it works def handle = AwaitableAdapterRegistry.register(adapter) try { - def result = AwaitableAdapterRegistry.toAwaitable(new CustomResult(data: "hello")) + def result = Awaitable.from(new CustomResult(data: "hello")) assert await(result) == "hello" } finally { handle.close() @@ -697,7 +697,7 @@ class SpringWebFluxStyleController { // After close, the adapter should be removed try { - AwaitableAdapterRegistry.toAwaitable(new CustomResult(data: "fail")) + Awaitable.from(new CustomResult(data: "fail")) assert false : "Should throw" } catch (IllegalArgumentException e) { // expected @@ -740,7 +740,7 @@ class SpringWebFluxStyleController { def future = new FutureTask<String>({ "from-blocking-future" }) pool.submit(future) - def aw = AwaitableAdapterRegistry.toAwaitable(future) + def aw = Awaitable.from(future) assert await(aw) == "from-blocking-future" } finally { AwaitableAdapterRegistry.setBlockingExecutor(null) diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy index 58f645672f..e3ab3a87d6 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy @@ -1176,11 +1176,10 @@ class AsyncTransformTest { void testAdapterRegistryWithCompletableFuture() { assertScript ''' import groovy.concurrent.Awaitable - import groovy.concurrent.AwaitableAdapterRegistry import java.util.concurrent.CompletableFuture def cf = CompletableFuture.completedFuture("adapted") - Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(cf) + Awaitable<String> a = Awaitable.from(cf) assert a.get() == "adapted" ''' } @@ -1189,10 +1188,9 @@ class AsyncTransformTest { void testAdapterRegistryPassthroughAwaitable() { assertScript ''' import groovy.concurrent.Awaitable - import groovy.concurrent.AwaitableAdapterRegistry def original = Awaitable.of(42) - def adapted = AwaitableAdapterRegistry.toAwaitable(original) + def adapted = Awaitable.from(original) assert adapted.is(original) ''' } @@ -1221,7 +1219,7 @@ class AsyncTransformTest { }) def custom = new CustomPromise("custom-value") - Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(custom) + Awaitable<String> a = Awaitable.from(custom) assert a.get() == "custom-value" ''' } @@ -1448,7 +1446,7 @@ class AsyncTransformTest { assert !AwaitableAdapterRegistry.unregister(adapter) // already removed try { - AwaitableAdapterRegistry.toAwaitable(new FakePromise(val: "x")) + Awaitable.from(new FakePromise(val: "x")) assert false : "should have thrown" } catch (IllegalArgumentException expected) { assert expected.message.contains("No AwaitableAdapter") @@ -1472,7 +1470,7 @@ class AsyncTransformTest { handle.close() // unregister via AutoCloseable try { - AwaitableAdapterRegistry.toAwaitable(new Token(id: 1)) + Awaitable.from(new Token(id: 1)) assert false } catch (IllegalArgumentException expected) { } ''' @@ -1508,7 +1506,7 @@ class AsyncTransformTest { // Test with Iterator directly def iter = [10, 20, 30].iterator() - AsyncStream stream = AwaitableAdapterRegistry.toAsyncStream(iter) + AsyncStream stream = AsyncStream.from(iter) def results = [] while (await(stream.moveNext())) { results << stream.getCurrent() @@ -1523,7 +1521,7 @@ class AsyncTransformTest { import groovy.concurrent.* try { - AwaitableAdapterRegistry.toAsyncStream("a string") + AsyncStream.from("a string") assert false : "should throw" } catch (IllegalArgumentException expected) { assert expected.message.contains("No AsyncStream adapter") @@ -1537,7 +1535,7 @@ class AsyncTransformTest { import groovy.concurrent.* try { - AwaitableAdapterRegistry.toAwaitable("plain string") + Awaitable.from("plain string") assert false : "should throw" } catch (IllegalArgumentException expected) { assert expected.message.contains("No AwaitableAdapter")
