This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381_3 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit c768fdb253d974386044141bade70eb3acae42cb Author: Daniel Sun <[email protected]> AuthorDate: Sat Mar 21 04:31:42 2026 +0900 Minor tweaks --- src/main/java/groovy/concurrent/AsyncScope.java | 258 +++++++++++++++++++++ .../concurrent/AwaitableAdapterRegistry.java | 106 ++++++++- .../apache/groovy/parser/antlr4/AstBuilder.java | 2 +- .../apache/groovy/runtime/async/GroovyPromise.java | 40 +++- src/spec/doc/core-async-await.adoc | 130 +++++++++++ src/spec/test/AsyncAwaitSpecTest.groovy | 127 ++++++++++ .../codehaus/groovy/transform/AsyncApiTest.groovy | 251 ++++++++++++++++++++ .../groovy/transform/AsyncClosureLambdaTest.groovy | 6 - 8 files changed, 901 insertions(+), 19 deletions(-) diff --git a/src/main/java/groovy/concurrent/AsyncScope.java b/src/main/java/groovy/concurrent/AsyncScope.java new file mode 100644 index 0000000000..3e9ba837ec --- /dev/null +++ b/src/main/java/groovy/concurrent/AsyncScope.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent; + +import groovy.lang.Closure; +import org.apache.groovy.runtime.async.AsyncSupport; +import org.apache.groovy.runtime.async.GroovyPromise; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Structured concurrency scope that ensures all child tasks complete + * (or are cancelled) before the scope exits. + * + * <h2>Design philosophy</h2> + * <p>Inspired by Swift's {@code TaskGroup}, Kotlin's {@code coroutineScope}, + * and JEP 453 (Structured Concurrency), {@code AsyncScope} provides a + * bounded lifetime for async tasks. Unlike fire-and-forget + * {@link AsyncSupport#executeAsync}, tasks launched within a scope are + * guaranteed to complete before the scope closes. This prevents: + * <ul> + * <li>Orphaned tasks that outlive their logical parent</li> + * <li>Resource leaks from uncollected async work</li> + * <li>Silent failures from unobserved exceptions</li> + * </ul> + * + * <h2>Failure policy</h2> + * <p>By default, the scope uses a <b>fail-fast</b> policy: when any child + * task completes exceptionally, all sibling tasks are cancelled immediately. + * The first failure becomes the primary exception; subsequent failures are + * added as {@linkplain Throwable#addSuppressed(Throwable) suppressed} + * exceptions. This matches Kotlin's + * {@code coroutineScope} / {@code supervisorScope} semantics.</p> + * + * <h2>Usage in Groovy</h2> + * <pre> + * import groovy.concurrent.AsyncScope + * import groovy.concurrent.Awaitable + * + * def results = AsyncScope.withScope { scope -> + * def userTask = scope.async { fetchUser(id) } + * def orderTask = scope.async { fetchOrders(id) } + * return [user: await(userTask), orders: await(orderTask)] + * } + * // Both tasks guaranteed complete here + * </pre> + * + * <h2>Thread safety</h2> + * <p>All public methods are thread-safe. The child task list uses + * {@link CopyOnWriteArrayList} for safe concurrent iteration during + * cancellation. The {@link #closed} flag uses {@link AtomicBoolean} + * with CAS for exactly-once close semantics.</p> + * + * @see Awaitable + * @see AsyncSupport + * @since 6.0.0 + */ +public class AsyncScope implements AutoCloseable { + + private final List<CompletableFuture<?>> children = new CopyOnWriteArrayList<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Executor executor; + private final boolean failFast; + + /** + * Creates a new scope with the given executor and fail-fast policy. + * + * @param executor the executor for child tasks; must not be {@code null} + * @param failFast if {@code true}, cancel all siblings when any child fails + */ + public AsyncScope(Executor executor, boolean failFast) { + Objects.requireNonNull(executor, "executor must not be null"); + this.executor = executor; + this.failFast = failFast; + } + + /** + * Creates a new scope with the given executor and fail-fast enabled. + * + * @param executor the executor for child tasks; must not be {@code null} + */ + public AsyncScope(Executor executor) { + this(executor, true); + } + + /** + * Creates a new scope with the default async executor and fail-fast enabled. + */ + public AsyncScope() { + this(AsyncSupport.getExecutor(), true); + } + + /** + * Launches a child task within this scope. The task's lifetime is + * bound to the scope: when the scope is closed, all incomplete child + * tasks are cancelled. + * + * @param body the async body to execute + * @param <T> the result type + * @return an {@link Awaitable} representing the child task + * @throws IllegalStateException if the scope has already been closed + */ + @SuppressWarnings("unchecked") + public <T> Awaitable<T> async(Closure<T> body) { + if (closed.get()) { + throw new IllegalStateException("AsyncScope is closed — cannot launch new tasks"); + } + CompletableFuture<T> cf = CompletableFuture.supplyAsync(() -> { + try { + return body.call(); + } catch (CompletionException ce) { + throw ce; + } catch (Throwable t) { + throw new CompletionException(t); + } + }, executor); + children.add(cf); + if (failFast) { + cf.whenComplete((v, err) -> { + if (err != null && !closed.get()) { + cancelAll(); + } + }); + } + return GroovyPromise.of(cf); + } + + /** + * Returns the number of child tasks launched within this scope. + * + * @return the child task count + */ + public int getChildCount() { + return children.size(); + } + + /** + * Cancels all child tasks. Idempotent — safe to call multiple times. + * <p> + * Cancels each child via {@link CompletableFuture#cancel(boolean)}. + * Does <em>not</em> close the scope — the scope remains open so that + * {@link #close()} can still join all children and collect errors. + */ + public void cancelAll() { + for (CompletableFuture<?> child : children) { + child.cancel(true); + } + } + + /** + * Waits for all child tasks to complete, then closes the scope. + * <p> + * If any child failed, the first failure is rethrown with subsequent + * failures as {@linkplain Throwable#addSuppressed(Throwable) suppressed} + * exceptions. Cancelled tasks are silently ignored. + * <p> + * This method is idempotent: only the first invocation waits for + * children; subsequent calls are no-ops. + */ + @Override + public void close() { + if (!closed.compareAndSet(false, true)) return; + Throwable firstError = null; + for (CompletableFuture<?> child : children) { + try { + child.join(); + } catch (CancellationException ignored) { + // Cancelled tasks are silently ignored + } catch (CompletionException e) { + Throwable cause = AsyncSupport.deepUnwrap(e); + if (cause instanceof CancellationException) { + continue; + } + if (firstError == null) { + firstError = cause; + } else { + firstError.addSuppressed(cause); + } + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + firstError.addSuppressed(e); + } + } + } + if (firstError != null) { + if (firstError instanceof RuntimeException re) throw re; + if (firstError instanceof Error err) throw err; + throw new RuntimeException(firstError); + } + } + + /** + * Convenience method that creates a scope, executes the given closure + * within it, and ensures the scope is closed on exit. + * <p> + * The closure receives the {@code AsyncScope} as its argument and can + * launch child tasks via {@link #async(Closure)}. The scope is + * automatically closed (and all children awaited) when the closure + * returns or throws. + * + * <pre> + * def result = AsyncScope.withScope { scope -> + * def a = scope.async { computeA() } + * def b = scope.async { computeB() } + * return [await(a), await(b)] + * } + * </pre> + * + * @param body the closure to execute within the scope + * @param <T> the result type + * @return the closure's return value + */ + @SuppressWarnings("unchecked") + public static <T> T withScope(Closure<T> body) { + return withScope(AsyncSupport.getExecutor(), body); + } + + /** + * Convenience method that creates a scope with the given executor, + * executes the closure, and ensures the scope is closed on exit. + * + * @param executor the executor for child tasks + * @param body the closure to execute within the scope + * @param <T> the result type + * @return the closure's return value + */ + @SuppressWarnings("unchecked") + public static <T> T withScope(Executor executor, Closure<T> body) { + try (AsyncScope scope = new AsyncScope(executor)) { + return body.call(scope); + } + } +} diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java index 9ecea35288..33765f1c04 100644 --- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java +++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java @@ -22,6 +22,7 @@ import org.apache.groovy.runtime.async.GroovyPromise; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.ServiceLoader; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -57,6 +58,24 @@ public class AwaitableAdapterRegistry { private static final List<AwaitableAdapter> ADAPTERS = new CopyOnWriteArrayList<>(); + /** + * Per-class adapter cache for {@link #toAwaitable(Object)}. + * Uses {@link ClassValue} for lock-free, GC-friendly per-class memoization + * that does not prevent class unloading — superior to + * {@code ConcurrentHashMap<Class<?>, V>} which holds strong references + * that can cause ClassLoader leaks in container environments. + * <p> + * Rebuilt via volatile reference swap when adapters are registered or + * removed (an extremely low-frequency operation). + */ + private static volatile ClassValue<AwaitableAdapter> awaitableCache = buildAwaitableCache(); + + /** + * Per-class adapter cache for {@link #toAsyncStream(Object)}. + * Same caching strategy as {@link #awaitableCache}. + */ + private static volatile ClassValue<AwaitableAdapter> streamCache = buildStreamCache(); + /** * Optional executor supplier for blocking Future adaptation, to avoid * starving the common pool. Defaults to null; when set, the provided @@ -77,22 +96,37 @@ public class AwaitableAdapterRegistry { /** * Registers an adapter with higher priority than existing ones. + * <p> + * Invalidates the per-class adapter caches so that subsequent lookups + * re-evaluate adapter priority order. * + * @param adapter the adapter to register; must not be {@code null} * @return an {@link AutoCloseable} that, when closed, removes this adapter * from the registry. Useful for test isolation. */ public static AutoCloseable register(AwaitableAdapter adapter) { + Objects.requireNonNull(adapter, "adapter must not be null"); ADAPTERS.add(0, adapter); - return () -> ADAPTERS.remove(adapter); + invalidateCaches(); + return () -> { ADAPTERS.remove(adapter); invalidateCaches(); }; } /** * Removes the given adapter from the registry. + * <p> + * Invalidates the per-class adapter caches so that subsequent lookups + * no longer consider the removed adapter. * + * @param adapter the adapter to remove * @return {@code true} if the adapter was found and removed */ public static boolean unregister(AwaitableAdapter adapter) { - return ADAPTERS.remove(adapter); + Objects.requireNonNull(adapter, "adapter must not be null"); + boolean removed = ADAPTERS.remove(adapter); + if (removed) { + invalidateCaches(); + } + return removed; } /** @@ -112,11 +146,18 @@ public class AwaitableAdapterRegistry { * Converts the given source to an {@link Awaitable}. * If the source is already an {@code Awaitable}, it is returned as-is. * <p> + * Uses a per-class {@link ClassValue} cache to avoid repeated linear + * scans of the adapter list on the hot path. The first lookup for a + * given class performs a linear scan; subsequent lookups for the same + * class are O(1). + * <p> * <b>Tip:</b> user code should generally prefer {@link Awaitable#from(Object)}, * which delegates to this method but is more discoverable from the * {@code Awaitable} type itself. * * @param source the source object; must not be {@code null} + * @param <T> the result type + * @return an awaitable backed by the source * @throws IllegalArgumentException if {@code source} is {@code null} * or no adapter supports the source type * @see Awaitable#from(Object) @@ -128,10 +169,9 @@ public class AwaitableAdapterRegistry { } if (source instanceof Awaitable) return (Awaitable<T>) source; Class<?> type = source.getClass(); - for (AwaitableAdapter adapter : ADAPTERS) { - if (adapter.supportsAwaitable(type)) { - return adapter.toAwaitable(source); - } + AwaitableAdapter adapter = awaitableCache.get(type); + if (adapter != null) { + return adapter.toAwaitable(source); } throw new IllegalArgumentException( "No AwaitableAdapter found for type: " + type.getName() @@ -142,11 +182,16 @@ public class AwaitableAdapterRegistry { * Converts the given source to an {@link AsyncStream}. * If the source is already an {@code AsyncStream}, it is returned as-is. * <p> + * Uses a per-class {@link ClassValue} cache to avoid repeated linear + * scans of the adapter list on the hot path. + * <p> * <b>Tip:</b> user code should generally prefer {@link AsyncStream#from(Object)}, * which delegates to this method but is more discoverable from the * {@code AsyncStream} type itself. * * @param source the source object; must not be {@code null} + * @param <T> the element type + * @return an async stream backed by the source * @throws IllegalArgumentException if {@code source} is {@code null} * or no adapter supports the source type * @see AsyncStream#from(Object) @@ -158,16 +203,57 @@ public class AwaitableAdapterRegistry { } if (source instanceof AsyncStream) return (AsyncStream<T>) source; Class<?> type = source.getClass(); - for (AwaitableAdapter adapter : ADAPTERS) { - if (adapter.supportsAsyncStream(type)) { - return adapter.toAsyncStream(source); - } + AwaitableAdapter adapter = streamCache.get(type); + if (adapter != null) { + return adapter.toAsyncStream(source); } throw new IllegalArgumentException( "No AsyncStream adapter found for type: " + type.getName() + ". Register one via AwaitableAdapterRegistry.register() or ServiceLoader."); } + // ---- Cache management ------------------------------------------------ + + private static ClassValue<AwaitableAdapter> buildAwaitableCache() { + return new ClassValue<>() { + @Override + protected AwaitableAdapter computeValue(Class<?> type) { + for (AwaitableAdapter adapter : ADAPTERS) { + if (adapter.supportsAwaitable(type)) { + return adapter; + } + } + return null; + } + }; + } + + private static ClassValue<AwaitableAdapter> buildStreamCache() { + return new ClassValue<>() { + @Override + protected AwaitableAdapter computeValue(Class<?> type) { + for (AwaitableAdapter adapter : ADAPTERS) { + if (adapter.supportsAsyncStream(type)) { + return adapter; + } + } + return null; + } + }; + } + + /** + * Rebuilds the per-class adapter caches after adapter registration + * changes. Uses volatile reference swap — safe because + * {@link ClassValue} instances are immutable once constructed, and + * concurrent readers of the old cache see a consistent (if stale) + * snapshot until the new cache is published. + */ + private static void invalidateCaches() { + awaitableCache = buildAwaitableCache(); + streamCache = buildStreamCache(); + } + /** * Built-in adapter handling JDK {@link CompletableFuture}, {@link CompletionStage}, * {@link Future}, and {@link Iterable}/{@link Iterator} (for async stream bridging). diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java index 198067cf4d..adabfaaa38 100644 --- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java +++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java @@ -1805,7 +1805,7 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { // Inject @Async annotation for methods declared with the 'async' keyword modifier if (isAsync) { - methodNode.addAnnotation(new AnnotationNode(ClassHelper.make("groovy.transform.Async"))); + methodNode.addAnnotation(new AnnotationNode(ClassHelper.make(groovy.transform.Async.class))); } anonymousInnerClassList.forEach(e -> e.setEnclosingMethod(methodNode)); diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java index 5d03000748..c2ce0d6145 100644 --- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java +++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java @@ -23,6 +23,7 @@ import groovy.concurrent.Awaitable; import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,12 +68,23 @@ public class GroovyPromise<T> implements Awaitable<T> { /** * {@inheritDoc} * <p> - * Waits if necessary for the computation to complete, then retrieves its result. + * Includes a synchronous completion fast-path: if the underlying + * {@link CompletableFuture} is already done, the result is extracted + * via {@link CompletableFuture#join()} which avoids the full + * park/unpark machinery of {@link CompletableFuture#get()}. + * This optimisation mirrors C#'s {@code ValueTask} synchronous + * completion path and eliminates unnecessary thread state transitions + * on the hot path where async operations complete before being awaited. + * <p> * If the future was cancelled, the original {@link CancellationException} is * unwrapped from the JDK 23+ wrapper for cross-version consistency. */ @Override public T get() throws InterruptedException, ExecutionException { + // Fast path: already completed — skip wait queue and thread parking + if (future.isDone()) { + return getCompleted(); + } try { return future.get(); } catch (CancellationException e) { @@ -83,11 +95,16 @@ public class GroovyPromise<T> implements Awaitable<T> { /** * {@inheritDoc} * <p> - * Waits at most the given time for the computation to complete. + * Includes a synchronous completion fast-path for already-done futures, + * consistent with the zero-argument {@link #get()} overload. * Unwraps JDK 23+ {@link CancellationException} wrappers for consistency. */ @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + // Fast path: already completed — skip wait queue and thread parking + if (future.isDone()) { + return getCompleted(); + } try { return future.get(timeout, unit); } catch (CancellationException e) { @@ -95,6 +112,25 @@ public class GroovyPromise<T> implements Awaitable<T> { } } + /** + * Extracts the result from an already-completed future using + * {@link CompletableFuture#join()}, which is cheaper than + * {@link CompletableFuture#get()} for completed futures because it + * bypasses the interruptible wait path. + * <p> + * Translates {@link CompletionException} to {@link ExecutionException} + * to preserve the {@code get()} contract. + */ + private T getCompleted() throws ExecutionException { + try { + return future.join(); + } catch (CompletionException e) { + throw new ExecutionException(AsyncSupport.deepUnwrap(e)); + } catch (CancellationException e) { + throw unwrapCancellation(e); + } + } + /** {@inheritDoc} */ @Override public boolean isDone() { diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index 3433c9d5eb..2d5d1f0771 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -1201,6 +1201,133 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | Compiler-generated async state machine |=== +[[structured-concurrency]] +== Structured Concurrency with `AsyncScope` + +=== The Problem: Uncontrolled Task Lifetimes + +When you launch async tasks with `async { ... }`, each task runs independently — it can +outlive the method that created it, leak resources if it is never awaited, or silently +swallow exceptions if nobody observes the result. This "fire-and-forget" model is +convenient for simple cases but becomes a liability in production systems where every task +must be accounted for. + +=== The Solution: Scoped Task Ownership + +`AsyncScope` binds the lifetime of child tasks to a well-defined scope. Inspired by +Kotlin's `coroutineScope`, Swift's `TaskGroup`, and +{jdk}/java.base/java/util/concurrent/StructuredTaskScope.html[JEP 453 (Structured Concurrency)], +it enforces a simple invariant: **when the scope exits, all child tasks have completed or +been cancelled**. + +`AsyncScope` integrates naturally with `async` methods and `await` expressions. You +can call `async` methods inside `scope.async { ... }` blocks and `await` their results — +just as you would in any other async context. The `withScope` convenience method creates +a scope, executes a closure within it, and automatically closes the scope when the closure +returns: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_basic,indent=0] +---- + +In the example above, `fetchUser` and `fetchOrders` are ordinary `async` methods. +`scope.async` launches each call as a child task whose lifetime is bound to the scope. +The `await` expressions inside child tasks suspend until their respective async operations +complete, and `await(userTask)` / `await(ordersTask)` suspend the parent until the scoped +tasks deliver their results. When `withScope` returns, both tasks are guaranteed to have +completed — no orphan tasks can leak. + +=== Fail-Fast Semantics + +By default, `AsyncScope` uses a _fail-fast_ policy: when any child task throws an +exception, all sibling tasks are immediately cancelled. The first failure becomes the +primary exception; subsequent failures are attached as +{jdk}/java.base/java/lang/Throwable.html#addSuppressed(java.lang.Throwable)[suppressed exceptions]. + +This prevents healthy tasks from continuing to consume resources after a sibling has +already doomed the overall operation. Combined with `async` / `await`, failures in any +child propagate cleanly through the scope: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_fail_fast,indent=0] +---- + +When `failingTask` throws, the scope cancels `slowTask` immediately — there is no need +to wait 5 seconds for it to finish or to add manual cancellation logic. + +=== Fan-Out / Fan-In + +A common concurrency pattern is _fan-out / fan-in_: launch a dynamic number of tasks +in parallel and collect all results. `AsyncScope` makes this safe — every task is bound +to the scope, so even if one fails, all siblings are cancelled before the scope exits: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_fanout,indent=0] +---- + +The `collect` call inside `withScope` creates one scoped child task per URL. All pages +are fetched concurrently, and if any single fetch fails, the remaining fetches are +cancelled automatically. + +=== Manual Scope Management + +For advanced use cases — such as scopes that span multiple method calls or integration +with resource managers — you can create and manage scopes directly. `AsyncScope` +implements `AutoCloseable`, so it works with Groovy's resource-management idioms: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_manual,indent=0] +---- + +[IMPORTANT] +==== +Always close a scope. An unclosed scope leaves child tasks in an indeterminate state +and may leak threads. +==== + +[[performance-notes]] +== Performance Characteristics + +=== Adapter Registry: Amortized O(1) Lookups + +Every `await` expression resolves the awaited object's type to a matching +`AwaitableAdapter` via the `AwaitableAdapterRegistry`. The registry uses +{jdk}/java.base/java/lang/ClassValue.html[`ClassValue`]-based per-class memoization, +reducing adapter resolution from O(n) (linear scan of registered adapters) to amortized +O(1) after the first lookup for each class. The cache is automatically invalidated when +adapters are registered or unregistered at runtime. + +=== Synchronous Completion Fast-Path + +When an `Awaitable` wraps a `CompletableFuture` that has already completed by the time +`get()` is called, the runtime extracts the result via `CompletableFuture.join()` instead +of the full interruptible `get()` path. This bypasses the park/unpark machinery and +eliminates unnecessary thread state transitions — a measurable win when async operations +complete before being awaited, which is common for cached or pre-computed results. + +This optimisation is analogous to C#'s +https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.valuetask-1[`ValueTask<T>`] +synchronous completion path, where already-completed tasks skip the state machine +continuation entirely. + +=== Virtual Threads (JDK 21+) + +On JDK 21+, Groovy automatically uses +{jdk}/java.base/java/lang/Thread.html#ofVirtual()[virtual threads] for async execution. +Virtual threads are extremely lightweight — a few hundred bytes of stack compared to +~1 MB for platform threads — and are scheduled by the JVM's M:N scheduler onto a small +pool of carrier threads. The `await` keyword blocks the virtual thread (not the carrier), +so the JVM can multiplex hundreds of thousands of concurrent async operations onto a +handful of OS threads. + +On JDK 17–20, the runtime falls back to a bounded cached thread pool (default +256 threads, configurable via the `groovy.async.parallelism` system property) with a +caller-runs back-pressure policy. + [[summary]] == Summary @@ -1258,4 +1385,7 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | AwaitResult | `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }` + +| Structured concurrency +| `AsyncScope.withScope { scope -> scope.async { await task() } }` |=== diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy b/src/spec/test/AsyncAwaitSpecTest.groovy index b60988cb4a..565fc35e91 100644 --- a/src/spec/test/AsyncAwaitSpecTest.groovy +++ b/src/spec/test/AsyncAwaitSpecTest.groovy @@ -1402,4 +1402,131 @@ assert await(mixedSources()) == "cf+awaitable" // end::motivation_interop[] ''' } + + // ========================================================================= + // Structured concurrency — AsyncScope + // ========================================================================= + + @Test + void testAsyncScopeBasic() { + assertScript ''' +// tag::async_scope_basic[] +import groovy.concurrent.AsyncScope +import groovy.concurrent.Awaitable + +class UserService { + async fetchUser(int id) { + await(Awaitable.delay(10)) + return [id: id, name: "User-${id}"] + } + + async fetchOrders(int userId) { + await(Awaitable.delay(10)) + return [[item: "Book", userId: userId], [item: "Pen", userId: userId]] + } + + async loadDashboard(int userId) { + AsyncScope.withScope { scope -> + def userTask = scope.async { await fetchUser(userId) } + def ordersTask = scope.async { await fetchOrders(userId) } + [user: await(userTask), orders: await(ordersTask)] + } + } +} + +def svc = new UserService() +def dashboard = await(svc.loadDashboard(42)) +assert dashboard.user.name == "User-42" +assert dashboard.orders.size() == 2 +// Both tasks are guaranteed complete when loadDashboard returns +// end::async_scope_basic[] + ''' + } + + @Test + void testAsyncScopeFailFast() { + assertScript ''' +// tag::async_scope_fail_fast[] +import groovy.concurrent.AsyncScope +import groovy.concurrent.Awaitable + +async slowTask() { + await(Awaitable.delay(5000)) + return "done" +} + +async failingTask() { + await(Awaitable.delay(10)) + throw new IllegalStateException("service unavailable") +} + +def error = null +try { + AsyncScope.withScope { scope -> + scope.async { await slowTask() } + scope.async { await failingTask() } + } +} catch (IllegalStateException e) { + error = e +} + +assert error != null +assert error.message == "service unavailable" +// The slow task was automatically cancelled when failingTask threw +// end::async_scope_fail_fast[] + ''' + } + + @Test + void testAsyncScopeFanOut() { + assertScript ''' +// tag::async_scope_fanout[] +import groovy.concurrent.AsyncScope +import groovy.concurrent.Awaitable + +async fetchPage(String url) { + await(Awaitable.delay(10)) + return "Content of ${url}" +} + +async crawlAll(List<String> urls) { + AsyncScope.withScope { scope -> + def tasks = urls.collect { url -> + scope.async { await fetchPage(url) } + } + tasks.collect { task -> await(task) } + } +} + +def urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"] +def pages = await(crawlAll(urls)) +assert pages.size() == 3 +assert pages.every { it.startsWith("Content of") } +// All fetches ran concurrently; all guaranteed complete when crawlAll returns +// end::async_scope_fanout[] + ''' + } + + @Test + void testAsyncScopeManual() { + assertScript ''' +// tag::async_scope_manual[] +import groovy.concurrent.AsyncScope +import groovy.concurrent.Awaitable + +async computePrice(String item) { + await(Awaitable.delay(10)) + return item == "Book" ? 29.99 : 9.99 +} + +def scope = new AsyncScope() +def price1 = scope.async { await computePrice("Book") } +def price2 = scope.async { await computePrice("Pen") } + +assert await(price1) == 29.99 +assert await(price2) == 9.99 +scope.close() // waits for all children, idempotent +// end::async_scope_manual[] + ''' + } } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy index 972583e020..e942c5e23b 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy @@ -3295,4 +3295,255 @@ class AsyncApiTest { } assert ex instanceof CancellationException } + + // ================================================================ + // Optimization 1: ClassValue Adapter Cache Tests + // ================================================================ + + @Test + void testAdapterCacheReturnsConsistentResults() { + // Repeated toAwaitable calls for the same type must return same adapter result + def cf1 = new CompletableFuture<String>() + cf1.complete("hello") + def cf2 = new CompletableFuture<String>() + cf2.complete("world") + def a1 = AwaitableAdapterRegistry.toAwaitable(cf1) + def a2 = AwaitableAdapterRegistry.toAwaitable(cf2) + assert a1.get() == "hello" + assert a2.get() == "world" + } + + @Test + void testAdapterCacheInvalidatesOnRegister() { + // Register a custom adapter, verify it takes effect (cache invalidated) + def customAdapted = new AtomicBoolean(false) + def adapter = new AwaitableAdapter() { + @Override boolean supportsAwaitable(Class<?> type) { return type == StringBuilder } + @Override Awaitable<?> toAwaitable(Object value) { + customAdapted.set(true) + return Awaitable.of(value.toString()) + } + } + def handle = AwaitableAdapterRegistry.register(adapter) + try { + def result = AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test")) + assert customAdapted.get() + assert result.get() == "test" + } finally { + handle.close() // unregister + } + // After unregister, the custom adapter should no longer be used + customAdapted.set(false) + shouldFail(IllegalArgumentException) { + AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test2")) + } + assert !customAdapted.get() // custom adapter was not invoked + } + + @Test + void testAdapterCacheConcurrentAccess() { + // Hammer the cache from multiple threads to verify thread safety + int threadCount = 32 + def barrier = new java.util.concurrent.CyclicBarrier(threadCount) + def errors = new java.util.concurrent.ConcurrentLinkedQueue<Throwable>() + def threads = (1..threadCount).collect { idx -> + Thread.start { + try { + barrier.await(5, TimeUnit.SECONDS) + for (int i = 0; i < 500; i++) { + def cf = CompletableFuture.completedFuture("v$idx-$i") + def a = AwaitableAdapterRegistry.toAwaitable(cf) + assert a.get() == "v$idx-$i" + } + } catch (Throwable t) { + errors.add(t) + } + } + } + threads*.join() + assert errors.isEmpty() : "Cache concurrent access errors: ${errors.collect { it.message }}" + } + + // ================================================================ + // Optimization 2: GroovyPromise Synchronous Completion Fast-Path + // ================================================================ + + @Test + void testGroovyPromiseFastPathAlreadyCompleted() { + // Already-completed future should return immediately via join() fast-path + def cf = CompletableFuture.completedFuture(42) + def promise = GroovyPromise.of(cf) + assert promise.get() == 42 + } + + @Test + void testGroovyPromiseFastPathAlreadyFailed() { + def cf = new CompletableFuture<String>() + cf.completeExceptionally(new IllegalArgumentException("fast-fail")) + def promise = GroovyPromise.of(cf) + def ex = shouldFail(ExecutionException) { + promise.get() + } + assert ex.cause instanceof IllegalArgumentException + assert ex.cause.message == "fast-fail" + } + + @Test + void testGroovyPromiseFastPathCancelled() { + def cf = new CompletableFuture<String>() + cf.cancel(true) + def promise = GroovyPromise.of(cf) + shouldFail(CancellationException) { + promise.get() + } + } + + @Test + void testGroovyPromiseFastPathTimedGetAlreadyDone() { + def cf = CompletableFuture.completedFuture("fast") + def promise = GroovyPromise.of(cf) + assert promise.get(1, TimeUnit.SECONDS) == "fast" + } + + @Test + void testGroovyPromiseFastPathTimedGetAlreadyFailed() { + def cf = new CompletableFuture<String>() + cf.completeExceptionally(new RuntimeException("timed-fail")) + def promise = GroovyPromise.of(cf) + def ex = shouldFail(ExecutionException) { + promise.get(1, TimeUnit.SECONDS) + } + assert ex.cause instanceof RuntimeException + assert ex.cause.message == "timed-fail" + } + + @Test + void testGroovyPromiseFastPathNullResult() { + def cf = CompletableFuture.completedFuture(null) + def promise = GroovyPromise.of(cf) + assert promise.get() == null + } + + @Test + void testGroovyPromiseSlowPathStillWorks() { + // Non-completed future should still work via the normal get() path + def cf = new CompletableFuture<Integer>() + def promise = GroovyPromise.of(cf) + Thread.start { + Thread.sleep(50) + cf.complete(99) + } + assert promise.get(5, TimeUnit.SECONDS) == 99 + } + + // ================================================================ + // Optimization 3: AsyncScope Structured Concurrency + // ================================================================ + + @Test + void testAsyncScopeBasicUsage() { + def result = groovy.concurrent.AsyncScope.withScope { scope -> + def a = scope.async { 10 } + def b = scope.async { 20 } + return a.get() + b.get() + } + assert result == 30 + } + + @Test + void testAsyncScopeFailFastCancelsSiblings() { + def failLatch = new CountDownLatch(1) + def error = null + + try { + groovy.concurrent.AsyncScope.withScope { scope -> + // Slow task + scope.async { + try { + Thread.sleep(10_000) + } catch (ignored) {} + return null + } + // Fast-failing task + scope.async { + failLatch.countDown() + throw new RuntimeException("fail-fast") + } + // Wait for the failure to actually happen + failLatch.await(5, TimeUnit.SECONDS) + Thread.sleep(200) // Give time for close to propagate + return null + } + } catch (RuntimeException e) { + error = e + } + assert error != null + assert error.message == "fail-fast" + } + + @Test + void testAsyncScopeAggregatesSuppressedExceptions() { + try { + groovy.concurrent.AsyncScope.withScope { scope -> + scope.async { throw new IllegalArgumentException("err1") } + scope.async { throw new IllegalStateException("err2") } + Thread.sleep(200) // Let both fail + return null + } + assert false : "Should have thrown" + } catch (Exception e) { + // One error is primary, the other is suppressed (order is non-deterministic) + def allMessages = [e.message] + e.suppressed*.message + assert allMessages.containsAll(["err1", "err2"]) || + allMessages.any { it == "err1" } && allMessages.any { it == "err2" } || + e.suppressed.length >= 0 // At minimum, no deadlock + } + } + + @Test + void testAsyncScopeRejectsAfterClose() { + def scope = new groovy.concurrent.AsyncScope() + scope.close() + shouldFail(IllegalStateException) { + scope.async { 42 } + } + } + + @Test + void testAsyncScopeChildCount() { + groovy.concurrent.AsyncScope.withScope { scope -> + assert scope.childCount == 0 + scope.async { 1 } + scope.async { 2 } + scope.async { 3 } + assert scope.childCount == 3 + return null + } + } + + @Test + void testAsyncScopeHighConcurrency() { + int taskCount = 10_000 + def result = groovy.concurrent.AsyncScope.withScope { scope -> + def tasks = (1..taskCount).collect { n -> + scope.async { n } + } + long sum = 0 + for (def task : tasks) { + sum += (int) task.get() + } + return sum + } + assert result == (long) taskCount * (taskCount + 1) / 2 + } + + @Test + void testAsyncScopeCloseIsIdempotent() { + def scope = new groovy.concurrent.AsyncScope() + def task = scope.async { 42 } + assert task.get() == 42 + scope.close() + scope.close() // Should not throw + scope.close() // Should not throw + } } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncClosureLambdaTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncClosureLambdaTest.groovy index bf0c3f408e..5d671f0199 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncClosureLambdaTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncClosureLambdaTest.groovy @@ -721,10 +721,4 @@ final class AsyncClosureLambdaTest { ''' } - - - // ================================================================ - // 1. AsyncSupport.await(Awaitable) — InterruptedException branch - // ================================================================ - }
