This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new c5fbe109cf Minor tweaks
c5fbe109cf is described below
commit c5fbe109cf51076b04d095b203d9023944f70dad
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 21 18:01:27 2026 +0900
Minor tweaks
---
src/main/java/groovy/concurrent/AsyncContext.java | 374 ++++++++
src/main/java/groovy/concurrent/AsyncScope.java | 66 +-
src/main/java/groovy/concurrent/AsyncStream.java | 5 +
src/main/java/groovy/concurrent/Awaitable.java | 58 +-
.../concurrent/AwaitableAdapterRegistry.java | 4 +
src/main/java/groovy/concurrent/Channel.java | 409 +++++++++
.../groovy/concurrent/ChannelClosedException.java | 36 +
.../apache/groovy/runtime/async/AsyncSupport.java | 162 +++-
.../apache/groovy/runtime/async/GroovyPromise.java | 29 +-
.../groovy/transform/AsyncASTTransformation.java | 8 +-
.../groovy/transform/AsyncTransformHelper.java | 5 +-
src/spec/doc/core-async-await.adoc | 224 ++++-
src/spec/test/AsyncAwaitSpecTest.groovy | 244 ++++++
.../transform/AsyncContextChannelTest.groovy | 976 +++++++++++++++++++++
.../transform/AsyncRuntimeEnhancementTest.groovy | 254 ++++++
15 files changed, 2802 insertions(+), 52 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AsyncContext.java
b/src/main/java/groovy/concurrent/AsyncContext.java
new file mode 100644
index 0000000000..5f9fae9f66
--- /dev/null
+++ b/src/main/java/groovy/concurrent/AsyncContext.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent;
+
+import groovy.lang.Closure;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Lightweight async-local context map for Groovy async execution.
+ * <p>
+ * {@code AsyncContext} is Groovy's answer to C#'s {@code AsyncLocal},
+ * Python's {@code contextvars}, and Kotlin's coroutine context for the
+ * most common production use-cases: request correlation IDs, tenant IDs,
+ * security principals, trace metadata, and other small pieces of
+ * execution-scoped state that must survive thread hops.
+ * <p>
+ * The current context is stored per thread, but async runtimes capture a
+ * {@linkplain Snapshot snapshot} when a task or continuation is scheduled
+ * and restore that snapshot when it executes. Child tasks therefore see
+ * the parent's values at spawn time, while subsequent child-side
+ * mutations remain isolated from the parent and siblings.
+ * <p>
+ * Keys are normalized to non-null {@link String} values. Supplying
+ * {@code null} as a value removes the key.
+ *
+ * @since 6.0.0
+ */
+public final class AsyncContext {
+
+ private static final ThreadLocal<AsyncContext> CURRENT =
+ ThreadLocal.withInitial(AsyncContext::new);
+
+ private final Map<String, Object> values;
+
+ /**
+ * Creates an empty async context.
+ */
+ public AsyncContext() {
+ this.values = new LinkedHashMap<>();
+ }
+
+ /**
+ * Creates a context seeded from the supplied entries.
+ *
+ * @param initialValues initial context entries; may be {@code null}
+ */
+ public AsyncContext(Map<?, ?> initialValues) {
+ this();
+ putAll(initialValues);
+ }
+
+ // Internal constructor that directly adopts a pre-copied map
+ private AsyncContext(LinkedHashMap<String, Object> values, boolean
ignored) {
+ this.values = values;
+ }
+
+ /**
+ * Returns the live async context associated with the current thread.
+ *
+ * @return the current async context, never {@code null}
+ */
+ public static AsyncContext current() {
+ return CURRENT.get();
+ }
+
+ /**
+ * Captures an immutable snapshot of the current async context.
+ *
+ * @return the captured snapshot
+ */
+ public static Snapshot capture() {
+ return Snapshot.of(current().copyValues());
+ }
+
+ /**
+ * Executes the supplier with the given snapshot installed as the current
+ * async context, restoring the previous context afterwards.
+ *
+ * @param snapshot the snapshot to install
+ * @param supplier the action to execute
+ * @param <T> the result type
+ * @return the supplier's result
+ */
+ public static <T> T withSnapshot(Snapshot snapshot, Supplier<T> supplier) {
+ Objects.requireNonNull(snapshot, "snapshot must not be null");
+ Objects.requireNonNull(supplier, "supplier must not be null");
+ AsyncContext previous = CURRENT.get();
+ CURRENT.set(new AsyncContext(new LinkedHashMap<>(snapshot.values),
true));
+ try {
+ return supplier.get();
+ } finally {
+ CURRENT.set(previous);
+ }
+ }
+
+ /**
+ * Executes the runnable with the given snapshot installed as the current
+ * async context, restoring the previous context afterwards.
+ *
+ * @param snapshot the snapshot to install
+ * @param action the action to execute
+ */
+ public static void withSnapshot(Snapshot snapshot, Runnable action) {
+ withSnapshot(snapshot, () -> {
+ action.run();
+ return null;
+ });
+ }
+
+ /**
+ * Temporarily overlays the current async context with the supplied
+ * entries for the duration of the closure.
+ * <p>
+ * Any modifications performed inside the closure are scoped to that
+ * dynamic extent and are discarded when the closure returns or throws.
+ *
+ * @param entries the entries to overlay; may be {@code null}
+ * @param action the closure to execute
+ * @param <T> the result type
+ * @return the closure's result
+ */
+ public static <T> T with(Map<?, ?> entries, Closure<T> action) {
+ Objects.requireNonNull(action, "action must not be null");
+ Snapshot merged = capture().with(entries);
+ return withSnapshot(merged, () -> action.call());
+ }
+
+ /**
+ * Looks up a value in the current context by key.
+ *
+ * @param key the context key
+ * @return the stored value, or {@code null} if absent
+ */
+ public Object get(String key) {
+ return values.get(normalizeKey(key));
+ }
+
+ /**
+ * Groovy operator support for {@code context['traceId']}.
+ *
+ * @param key the context key
+ * @return the stored value, or {@code null} if absent
+ */
+ public Object getAt(String key) {
+ return get(key);
+ }
+
+ /**
+ * Stores a value in the current context.
+ * <p>
+ * A {@code null} value removes the key.
+ *
+ * @param key the context key
+ * @param value the value to store, or {@code null} to remove the key
+ * @return the previous value associated with the key, or {@code null}
+ */
+ public Object put(String key, Object value) {
+ String normalized = normalizeKey(key);
+ if (value == null) {
+ return values.remove(normalized);
+ }
+ return values.put(normalized, value);
+ }
+
+ /**
+ * Groovy operator support for {@code context['traceId'] = 'abc-123'}.
+ *
+ * @param key the context key
+ * @param value the value to store, or {@code null} to remove the key
+ */
+ public void putAt(String key, Object value) {
+ put(key, value);
+ }
+
+ /**
+ * Removes a key from the context.
+ *
+ * @param key the context key
+ * @return the removed value, or {@code null} if absent
+ */
+ public Object remove(String key) {
+ return values.remove(normalizeKey(key));
+ }
+
+ /**
+ * Adds all entries from the supplied map to this context.
+ * <p>
+ * A {@code null} map is ignored. {@code null} values remove keys.
+ *
+ * @param entries the entries to add
+ */
+ public void putAll(Map<?, ?> entries) {
+ if (entries == null || entries.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<?, ?> entry : entries.entrySet()) {
+ put(normalizeKey(entry.getKey()), entry.getValue());
+ }
+ }
+
+ /**
+ * Removes all entries from this context.
+ */
+ public void clear() {
+ values.clear();
+ }
+
+ /**
+ * Returns {@code true} if the given key is present.
+ *
+ * @param key the context key
+ * @return {@code true} if present
+ */
+ public boolean containsKey(String key) {
+ return values.containsKey(normalizeKey(key));
+ }
+
+ /**
+ * Returns the number of entries in this context.
+ *
+ * @return the entry count
+ */
+ public int size() {
+ return values.size();
+ }
+
+ /**
+ * Returns {@code true} if this context is empty.
+ *
+ * @return {@code true} if empty
+ */
+ public boolean isEmpty() {
+ return values.isEmpty();
+ }
+
+ /**
+ * Returns an immutable copy of this context's entries.
+ *
+ * @return an immutable snapshot of the context
+ */
+ public Map<String, Object> snapshot() {
+ return Collections.unmodifiableMap(copyValues());
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncContext" + values;
+ }
+
+ private Map<String, Object> copyValues() {
+ return new LinkedHashMap<>(values);
+ }
+
+ private static String normalizeKey(Object key) {
+ return Objects.requireNonNull(key, "context key must not be
null").toString();
+ }
+
+ /**
+ * Immutable async-context snapshot captured at task or continuation
+ * registration time.
+ *
+ * @since 6.0.0
+ */
+ public static final class Snapshot {
+
+ private final Map<String, Object> values;
+
+ private Snapshot(Map<String, Object> values) {
+ this.values = values;
+ }
+
+ // Creates a snapshot that directly wraps a pre-copied map
+ static Snapshot of(Map<String, Object> preCopied) {
+ return new Snapshot(Collections.unmodifiableMap(preCopied));
+ }
+
+ /**
+ * Returns {@code true} if this snapshot contains the given key.
+ *
+ * @param key the context key
+ * @return {@code true} if present
+ */
+ public boolean containsKey(String key) {
+ return values.containsKey(normalizeKey(key));
+ }
+
+ /**
+ * Returns the captured value for the given key.
+ *
+ * @param key the context key
+ * @return the captured value, or {@code null} if absent
+ */
+ public Object get(String key) {
+ return values.get(normalizeKey(key));
+ }
+
+ /**
+ * Returns {@code true} if this snapshot contains no entries.
+ *
+ * @return {@code true} if empty
+ */
+ public boolean isEmpty() {
+ return values.isEmpty();
+ }
+
+ /**
+ * Returns the number of entries in this snapshot.
+ *
+ * @return the entry count
+ */
+ public int size() {
+ return values.size();
+ }
+
+ /**
+ * Returns the captured entries as an immutable map.
+ *
+ * @return the captured entries
+ */
+ public Map<String, Object> asMap() {
+ return values;
+ }
+
+ /**
+ * Creates a new snapshot by overlaying the supplied entries on top of
+ * this snapshot.
+ *
+ * @param entries the entries to merge; may be {@code null}
+ * @return the merged snapshot
+ */
+ public Snapshot with(Map<?, ?> entries) {
+ if (entries == null || entries.isEmpty()) {
+ return this;
+ }
+ Map<String, Object> merged = new LinkedHashMap<>(values);
+ for (Map.Entry<?, ?> entry : entries.entrySet()) {
+ String key = normalizeKey(entry.getKey());
+ Object value = entry.getValue();
+ if (value == null) {
+ merged.remove(key);
+ } else {
+ merged.put(key, value);
+ }
+ }
+ return Snapshot.of(merged);
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncContext.Snapshot" + values;
+ }
+ }
+}
diff --git a/src/main/java/groovy/concurrent/AsyncScope.java
b/src/main/java/groovy/concurrent/AsyncScope.java
index 3e9ba837ec..f0c22a58e7 100644
--- a/src/main/java/groovy/concurrent/AsyncScope.java
+++ b/src/main/java/groovy/concurrent/AsyncScope.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
/**
* Structured concurrency scope that ensures all child tasks complete
@@ -80,6 +81,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class AsyncScope implements AutoCloseable {
+ private static final ThreadLocal<AsyncScope> CURRENT = new ThreadLocal<>();
+
private final List<CompletableFuture<?>> children = new
CopyOnWriteArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Executor executor;
@@ -113,10 +116,64 @@ public class AsyncScope implements AutoCloseable {
this(AsyncSupport.getExecutor(), true);
}
+ /**
+ * Returns the structured async scope currently bound to this thread, or
+ * {@code null} if execution is not inside {@link #withScope(Closure)} or
+ * a child launched from such a scope.
+ *
+ * @return the current scope, or {@code null}
+ */
+ public static AsyncScope current() {
+ return CURRENT.get();
+ }
+
+ /**
+ * Executes the supplier with the given scope installed as the current
+ * structured scope, restoring the previous binding afterwards.
+ *
+ * @param scope the scope to install; may be {@code null}
+ * @param supplier the action to execute
+ * @param <T> the result type
+ * @return the supplier's result
+ */
+ public static <T> T withCurrent(AsyncScope scope, Supplier<T> supplier) {
+ Objects.requireNonNull(supplier, "supplier must not be null");
+ AsyncScope previous = CURRENT.get();
+ if (scope == null) {
+ CURRENT.remove();
+ } else {
+ CURRENT.set(scope);
+ }
+ try {
+ return supplier.get();
+ } finally {
+ if (previous == null) {
+ CURRENT.remove();
+ } else {
+ CURRENT.set(previous);
+ }
+ }
+ }
+
+ /**
+ * Void overload of {@link #withCurrent(AsyncScope, Supplier)}.
+ *
+ * @param scope the scope to install; may be {@code null}
+ * @param action the action to execute
+ */
+ public static void withCurrent(AsyncScope scope, Runnable action) {
+ withCurrent(scope, () -> {
+ action.run();
+ return null;
+ });
+ }
+
/**
* Launches a child task within this scope. The task's lifetime is
* bound to the scope: when the scope is closed, all incomplete child
- * tasks are cancelled.
+ * tasks are cancelled. The child inherits a snapshot of the current
+ * {@link AsyncContext}, but any child-side context mutations remain
+ * isolated from the parent and siblings.
*
* @param body the async body to execute
* @param <T> the result type
@@ -125,12 +182,14 @@ public class AsyncScope implements AutoCloseable {
*/
@SuppressWarnings("unchecked")
public <T> Awaitable<T> async(Closure<T> body) {
+ Objects.requireNonNull(body, "body must not be null");
if (closed.get()) {
throw new IllegalStateException("AsyncScope is closed — cannot
launch new tasks");
}
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
CompletableFuture<T> cf = CompletableFuture.supplyAsync(() -> {
try {
- return body.call();
+ return withCurrent(this, () ->
AsyncContext.withSnapshot(contextSnapshot, () -> body.call()));
} catch (CompletionException ce) {
throw ce;
} catch (Throwable t) {
@@ -251,8 +310,9 @@ public class AsyncScope implements AutoCloseable {
*/
@SuppressWarnings("unchecked")
public static <T> T withScope(Executor executor, Closure<T> body) {
+ Objects.requireNonNull(body, "body must not be null");
try (AsyncScope scope = new AsyncScope(executor)) {
- return body.call(scope);
+ return withCurrent(scope, () -> body.call(scope));
}
}
}
diff --git a/src/main/java/groovy/concurrent/AsyncStream.java
b/src/main/java/groovy/concurrent/AsyncStream.java
index 318ddf6355..d2ac2603ad 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -35,6 +35,11 @@ package groovy.concurrent;
* to create a generator-style stream</li>
* <li>Adapting JDK {@link java.util.concurrent.Flow.Publisher} instances
* (supported out of the box by the built-in adapter)</li>
+ * <li>Viewing a {@link Channel} as a stream via {@link Channel#asStream()} —
+ * each {@code moveNext()} call maps to a {@code receive()}, and
+ * {@link ChannelClosedException} is translated to end-of-stream.
+ * This conversion is registered automatically, so {@code for await
(item in channel)}
+ * works out of the box.</li>
* <li>Adapting third-party reactive types (Reactor {@code Flux}, RxJava
* {@code Observable}) via {@link AwaitableAdapter}</li>
* </ul>
diff --git a/src/main/java/groovy/concurrent/Awaitable.java
b/src/main/java/groovy/concurrent/Awaitable.java
index d7bf8fb689..0da8a093da 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -18,6 +18,7 @@
*/
package groovy.concurrent;
+import groovy.lang.Closure;
import org.apache.groovy.runtime.async.AsyncSupport;
import org.apache.groovy.runtime.async.GroovyPromise;
@@ -70,6 +71,8 @@ import java.util.function.Function;
* {@code Task.FromResult()} / {@code Promise.resolve()}</li>
* <li>{@link #failed(Throwable) Awaitable.failed(error)} — like
* {@code Task.FromException()} / {@code Promise.reject()}</li>
+ * <li>{@link #go(Closure) Awaitable.go { ... }} — lightweight Go-style task
spawn</li>
+ * <li>{@link #channel()} / {@link #channel(int)} — Go-style buffered or
unbuffered channels</li>
* </ul>
* <p>
* <b>Instance continuations</b> provide ergonomic composition without exposing
@@ -183,7 +186,12 @@ public interface Awaitable<T> {
* @since 6.0.0
*/
default Awaitable<Void> thenAccept(Consumer<? super T> action) {
- return GroovyPromise.of(toCompletableFuture().thenAccept(action));
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ return GroovyPromise.of(toCompletableFuture().thenAccept(value ->
+ AsyncContext.withSnapshot(contextSnapshot, () -> {
+ action.accept(value);
+ return null;
+ })));
}
/**
@@ -212,8 +220,12 @@ public interface Awaitable<T> {
* @since 6.0.0
*/
default Awaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable>
action) {
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
return GroovyPromise.of(toCompletableFuture().whenComplete((value,
error) ->
- action.accept(value, error == null ? null :
AsyncSupport.deepUnwrap(error))));
+ AsyncContext.withSnapshot(contextSnapshot, () -> {
+ action.accept(value, error == null ? null :
AsyncSupport.deepUnwrap(error));
+ return null;
+ })));
}
/**
@@ -231,8 +243,10 @@ public interface Awaitable<T> {
* @since 6.0.0
*/
default <U> Awaitable<U> handle(BiFunction<? super T, Throwable, ? extends
U> fn) {
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
return GroovyPromise.of(toCompletableFuture().handle((value, error) ->
- fn.apply(value, error == null ? null :
AsyncSupport.deepUnwrap(error))));
+ AsyncContext.withSnapshot(contextSnapshot,
+ () -> fn.apply(value, error == null ? null :
AsyncSupport.deepUnwrap(error)))));
}
/**
@@ -352,6 +366,44 @@ public interface Awaitable<T> {
return new GroovyPromise<>(CompletableFuture.failedFuture(error));
}
+ /**
+ * Spawns a lightweight Go-style task.
+ * <p>
+ * Inside an {@link AsyncScope}, the new task is attached to the current
+ * structured scope. Outside a scope it behaves like a detached async task.
+ *
+ * @param closure the task body
+ * @param <T> the result type
+ * @return an awaitable representing the spawned task
+ * @since 6.0.0
+ */
+ static <T> Awaitable<T> go(Closure<T> closure) {
+ return AsyncSupport.go(closure);
+ }
+
+ /**
+ * Creates an unbuffered Go-style channel.
+ *
+ * @param <T> the payload type
+ * @return a new unbuffered channel
+ * @since 6.0.0
+ */
+ static <T> Channel<T> channel() {
+ return AsyncSupport.channel();
+ }
+
+ /**
+ * Creates a Go-style channel with the given buffer capacity.
+ *
+ * @param capacity the buffer capacity
+ * @param <T> the payload type
+ * @return a new channel
+ * @since 6.0.0
+ */
+ static <T> Channel<T> channel(int capacity) {
+ return AsyncSupport.channel(capacity);
+ }
+
// ---- Combinators (like C#'s Task.WhenAll/WhenAny, JS's Promise.all/any)
----
/**
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 33765f1c04..e3921763d9 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -181,6 +181,9 @@ public class AwaitableAdapterRegistry {
/**
* Converts the given source to an {@link AsyncStream}.
* If the source is already an {@code AsyncStream}, it is returned as-is.
+ * {@link Channel} instances are handled as a built-in special case via
+ * {@link Channel#asStream()}, enabling transparent {@code for await}
+ * iteration over channels.
* <p>
* Uses a per-class {@link ClassValue} cache to avoid repeated linear
* scans of the adapter list on the hot path.
@@ -202,6 +205,7 @@ public class AwaitableAdapterRegistry {
throw new IllegalArgumentException("Cannot convert null to
AsyncStream");
}
if (source instanceof AsyncStream) return (AsyncStream<T>) source;
+ if (source instanceof Channel) return ((Channel<T>) source).asStream();
Class<?> type = source.getClass();
AwaitableAdapter adapter = streamCache.get(type);
if (adapter != null) {
diff --git a/src/main/java/groovy/concurrent/Channel.java
b/src/main/java/groovy/concurrent/Channel.java
new file mode 100644
index 0000000000..44c94a9c4a
--- /dev/null
+++ b/src/main/java/groovy/concurrent/Channel.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent;
+
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Go-inspired asynchronous channel with optional buffering.
+ * <p>
+ * A channel coordinates producers and consumers without exposing explicit
+ * locks. With a capacity of {@code 0}, sends rendezvous directly with
+ * receivers (unbuffered semantics). With a positive capacity, values are
+ * buffered until the channel fills, after which senders wait until a
+ * receiver makes room.
+ * <p>
+ * Channel operations return {@link Awaitable} values so they compose
+ * naturally with Groovy's {@code await} and {@link Awaitable#any(Object...)}
+ * APIs.
+ * <p>
+ * Channels also integrate with Groovy's {@code for await} syntax via
+ * the {@link #asStream()} method, which provides a read-only
+ * {@link AsyncStream} view that yields received values until the channel
+ * is closed:
+ * <pre>
+ * for await (item in channel) {
+ * process(item)
+ * }
+ * </pre>
+ * <p>
+ * {@code null} payloads are rejected to keep a clear distinction between
+ * "no value yet" and an actual payload. If the channel is closed, pending
+ * senders fail immediately and future receivers drain any buffered items
+ * before failing with {@link ChannelClosedException}.
+ *
+ * @param <T> the payload type
+ * @since 6.0.0
+ */
+public final class Channel<T> {
+
+ private final ReentrantLock lock = new ReentrantLock(true);
+ private final Deque<T> buffer = new ArrayDeque<>();
+ private final Deque<PendingSender<T>> waitingSenders = new ArrayDeque<>();
+ private final Deque<CompletableFuture<T>> waitingReceivers = new
ArrayDeque<>();
+ private final int capacity;
+
+ private boolean closed;
+
+ /**
+ * Creates an unbuffered channel.
+ */
+ public Channel() {
+ this(0);
+ }
+
+ /**
+ * Creates a channel with the given buffer capacity.
+ *
+ * @param capacity the buffer capacity; must not be negative
+ */
+ public Channel(int capacity) {
+ if (capacity < 0) {
+ throw new IllegalArgumentException("channel capacity must not be
negative: " + capacity);
+ }
+ this.capacity = capacity;
+ }
+
+ /**
+ * Returns this channel's configured buffer capacity.
+ *
+ * @return the capacity
+ */
+ public int getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Returns the number of currently buffered elements.
+ *
+ * @return the buffered element count
+ */
+ public int getBufferedSize() {
+ lock.lock();
+ try {
+ return buffer.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns {@code true} if the channel has been closed.
+ *
+ * @return {@code true} if closed
+ */
+ public boolean isClosed() {
+ lock.lock();
+ try {
+ return closed;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Sends a value through the channel.
+ * <p>
+ * The returned {@link Awaitable} completes when the value has either been
+ * handed off to a receiver or enqueued in the channel buffer.
+ *
+ * @param value the value to send; must not be {@code null}
+ * @return an awaitable that completes when the send succeeds
+ */
+ public Awaitable<Void> send(T value) {
+ Objects.requireNonNull(value, "channel does not support null values");
+ CompletableFuture<Void> completion = new CompletableFuture<>();
+ PendingSender<T> pendingSender = new PendingSender<>(value,
completion);
+
+ lock.lock();
+ try {
+ if (closed) {
+ completion.completeExceptionally(closedForSend());
+ } else if (deliverToWaitingReceiver(value)) {
+ completion.complete(null);
+ } else if (buffer.size() < capacity) {
+ buffer.addLast(value);
+ completion.complete(null);
+ } else {
+ waitingSenders.addLast(pendingSender);
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (!completion.isDone()) {
+ completion.whenComplete((ignored, error) -> {
+ if (error != null || completion.isCancelled()) {
+ removePendingSender(pendingSender);
+ }
+ });
+ }
+
+ return GroovyPromise.of(completion);
+ }
+
+ /**
+ * Receives the next value from the channel.
+ * <p>
+ * If the channel is buffered, buffered values are consumed first. If the
+ * channel is closed and drained, the returned awaitable fails with
+ * {@link ChannelClosedException}.
+ *
+ * @return an awaitable that yields the next value
+ */
+ public Awaitable<T> receive() {
+ CompletableFuture<T> completion = new CompletableFuture<>();
+
+ lock.lock();
+ try {
+ T buffered = pollBuffer();
+ if (buffered != null) {
+ completion.complete(buffered);
+ } else {
+ PendingSender<T> sender = pollPendingSender();
+ if (sender != null) {
+ sender.completion.complete(null);
+ completion.complete(sender.value);
+ } else if (closed) {
+ completion.completeExceptionally(closedForReceive());
+ } else {
+ waitingReceivers.addLast(completion);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (!completion.isDone()) {
+ completion.whenComplete((ignored, error) -> {
+ if (error != null || completion.isCancelled()) {
+ removePendingReceiver(completion);
+ }
+ });
+ }
+
+ return GroovyPromise.of(completion);
+ }
+
+ /**
+ * Closes the channel.
+ * <p>
+ * Closing is idempotent. Buffered values remain receivable. Any pending
+ * senders fail immediately, and receivers waiting after the buffer is
+ * drained fail with {@link ChannelClosedException}.
+ *
+ * @return {@code true} if this invocation closed the channel,
+ * {@code false} if it had already been closed
+ */
+ public boolean close() {
+ lock.lock();
+ try {
+ if (closed) {
+ return false;
+ }
+ closed = true;
+
+ drainBufferToReceivers();
+
+ while (!waitingReceivers.isEmpty()) {
+
waitingReceivers.removeFirst().completeExceptionally(closedForReceive());
+ }
+
+ while (!waitingSenders.isEmpty()) {
+
waitingSenders.removeFirst().completion.completeExceptionally(closedForSend());
+ }
+
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns a read-only {@link AsyncStream} view of this channel.
+ * <p>
+ * The returned stream yields values received from the channel via
+ * {@link #receive()}. When the channel is closed and all buffered
+ * values are drained, the stream signals completion
+ * ({@code moveNext()} returns {@code false}).
+ * <p>
+ * This enables natural {@code for await} iteration:
+ * <pre>
+ * for await (item in channel) {
+ * process(item)
+ * }
+ * </pre>
+ * <p>
+ * <b>Ownership:</b> the stream view does not own the channel — calling
+ * {@code close()} on the stream is a no-op. The producer is responsible
+ * for closing the channel when done sending. This matches Go's semantics
+ * where the receiver does not close the channel.
+ *
+ * @return an async stream view of this channel
+ */
+ public AsyncStream<T> asStream() {
+ return new ChannelAsyncStream<>(this);
+ }
+
+ @Override
+ public String toString() {
+ lock.lock();
+ try {
+ return "Channel{capacity=" + capacity
+ + ", buffered=" + buffer.size()
+ + ", waitingSenders=" + waitingSenders.size()
+ + ", waitingReceivers=" + waitingReceivers.size()
+ + ", closed=" + closed
+ + '}';
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean deliverToWaitingReceiver(T value) {
+ while (!waitingReceivers.isEmpty()) {
+ CompletableFuture<T> receiver = waitingReceivers.removeFirst();
+ if (receiver.complete(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void drainBufferToReceivers() {
+ while (!waitingReceivers.isEmpty() && !buffer.isEmpty()) {
+ CompletableFuture<T> receiver = waitingReceivers.removeFirst();
+ if (receiver.complete(buffer.peekFirst())) {
+ buffer.removeFirst();
+ }
+ }
+ }
+
+ private T pollBuffer() {
+ if (buffer.isEmpty()) {
+ return null;
+ }
+ T value = buffer.removeFirst();
+ refillBufferFromWaitingSenders();
+ return value;
+ }
+
+ private void refillBufferFromWaitingSenders() {
+ while (buffer.size() < capacity) {
+ PendingSender<T> sender = pollPendingSender();
+ if (sender == null) {
+ return;
+ }
+ buffer.addLast(sender.value);
+ sender.completion.complete(null);
+ }
+ }
+
+ private PendingSender<T> pollPendingSender() {
+ while (!waitingSenders.isEmpty()) {
+ PendingSender<T> sender = waitingSenders.removeFirst();
+ if (!sender.completion.isDone()) {
+ return sender;
+ }
+ }
+ return null;
+ }
+
+ private void removePendingSender(PendingSender<T> sender) {
+ lock.lock();
+ try {
+ waitingSenders.remove(sender);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void removePendingReceiver(CompletableFuture<T> receiver) {
+ lock.lock();
+ try {
+ waitingReceivers.remove(receiver);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private static ChannelClosedException closedForSend() {
+ return new ChannelClosedException("channel is closed for send");
+ }
+
+ private static ChannelClosedException closedForReceive() {
+ return new ChannelClosedException("channel is closed");
+ }
+
+ private static final class PendingSender<T> {
+ private final T value;
+ private final CompletableFuture<Void> completion;
+
+ private PendingSender(T value, CompletableFuture<Void> completion) {
+ this.value = value;
+ this.completion = completion;
+ }
+ }
+
+ /**
+ * Read-only {@link AsyncStream} view over a channel.
+ * <p>
+ * Each {@link #moveNext()} call issues a {@link Channel#receive()};
+ * {@link ChannelClosedException} is translated to end-of-stream
+ * ({@code false}). The stream does not own the channel, so its
+ * {@link #close()} is a no-op.
+ */
+ private static final class ChannelAsyncStream<T> implements AsyncStream<T>
{
+ private final Channel<T> channel;
+ private volatile T current;
+
+ ChannelAsyncStream(Channel<T> channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public Awaitable<Boolean> moveNext() {
+ return channel.receive().handle((value, error) -> {
+ if (error != null) {
+ if (error instanceof ChannelClosedException) {
+ return false;
+ }
+ if (error instanceof RuntimeException re) throw re;
+ if (error instanceof Error e) throw e;
+ throw new RuntimeException(error);
+ }
+ current = value;
+ return true;
+ });
+ }
+
+ @Override
+ public T getCurrent() {
+ return current;
+ }
+
+ // No-op: the stream view does not own the channel
+ }
+}
diff --git a/src/main/java/groovy/concurrent/ChannelClosedException.java
b/src/main/java/groovy/concurrent/ChannelClosedException.java
new file mode 100644
index 0000000000..e71fd02138
--- /dev/null
+++ b/src/main/java/groovy/concurrent/ChannelClosedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent;
+
+/**
+ * Indicates that a {@link Channel} operation was attempted after the channel
+ * had been closed.
+ *
+ * @since 6.0.0
+ */
+public class ChannelClosedException extends IllegalStateException {
+
+ public ChannelClosedException(String message) {
+ super(message);
+ }
+
+ public ChannelClosedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 7996c110e3..73b5a9e15f 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -18,9 +18,12 @@
*/
package org.apache.groovy.runtime.async;
+import groovy.concurrent.AsyncContext;
+import groovy.concurrent.AsyncScope;
import groovy.concurrent.AsyncStream;
import groovy.concurrent.AwaitResult;
import groovy.concurrent.Awaitable;
+import groovy.concurrent.Channel;
import groovy.lang.Closure;
import java.lang.invoke.MethodHandle;
@@ -34,6 +37,7 @@ import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
+import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -62,7 +66,8 @@ import java.util.concurrent.TimeoutException;
* Key responsibilities:
* <ul>
* <li><b>Async execution</b> — {@code executeAsync()}, {@code
executeAsyncVoid()},
- * and {@code wrapAsync()} run closures on the configured executor</li>
+ * {@code wrapAsync()}, and {@code go()} run closures on the configured
+ * executor while preserving the current {@link AsyncContext}</li>
* <li><b>Await</b> — all {@code await()} overloads use
* {@link groovy.concurrent.Awaitable#from(Object) Awaitable.from()} so
* that third-party async types
@@ -77,7 +82,9 @@ import java.util.concurrent.TimeoutException;
* cancellation upstream</li>
* <li><b>Defer</b> — {@code createDeferScope()}, {@code defer()}, and
* {@code executeDeferScope()} implement Go-style deferred cleanup with
- * LIFO execution and exception suppression</li>
+ * LIFO execution, async cleanup awareness, and exception
suppression</li>
+ * <li><b>Channels</b> — {@code channel()} creates Go-inspired channels that
+ * compose with {@link Awaitable#any(Object...)} / {@code select()}</li>
* <li><b>Delay</b> — {@code delay()} provides non-blocking delays using a
* shared {@link java.util.concurrent.ScheduledExecutorService}</li>
* <li><b>Timeouts</b> — {@code orTimeout()} and {@code completeOnTimeout()}
apply
@@ -314,6 +321,9 @@ public class AsyncSupport {
* Executes the given closure asynchronously on the specified executor,
* returning an {@link Awaitable} that completes with the closure's return
* value. Used by {@link groovy.transform.Async @Async} methods.
+ * <p>
+ * Captures the current {@link AsyncContext} at submission time and
+ * restores that snapshot when the closure runs.
*
* @param closure the closure to execute
* @param executor the executor on which to run the closure
@@ -322,32 +332,43 @@ public class AsyncSupport {
*/
@SuppressWarnings("unchecked")
public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor
executor) {
- return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
- try {
- return closure.call();
- } catch (Throwable t) {
- throw wrapForFuture(t);
- }
- }, executor));
+ Objects.requireNonNull(closure, "closure must not be null");
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ Executor targetExecutor = executor != null ? executor :
defaultExecutor;
+ return GroovyPromise.of(CompletableFuture.supplyAsync(() ->
+ AsyncContext.withSnapshot(contextSnapshot, () -> {
+ try {
+ return closure.call();
+ } catch (Throwable t) {
+ throw wrapForFuture(t);
+ }
+ }), targetExecutor));
}
/**
* Void variant of {@link #executeAsync(Closure, Executor)} for
* {@link groovy.transform.Async @Async} methods whose return type is
* {@code void}.
+ * <p>
+ * Captures and restores the current {@link AsyncContext} in the same way
+ * as {@link #executeAsync(Closure, Executor)}.
*
* @param closure the closure to execute
* @param executor the executor on which to run the closure
* @return an awaitable that completes when the closure finishes
*/
public static Awaitable<Void> executeAsyncVoid(Closure<?> closure,
Executor executor) {
- return GroovyPromise.of(CompletableFuture.runAsync(() -> {
- try {
- closure.call();
- } catch (Throwable t) {
- throw wrapForFuture(t);
- }
- }, executor));
+ Objects.requireNonNull(closure, "closure must not be null");
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ Executor targetExecutor = executor != null ? executor :
defaultExecutor;
+ return GroovyPromise.of(CompletableFuture.runAsync(() ->
+ AsyncContext.withSnapshot(contextSnapshot, () -> {
+ try {
+ closure.call();
+ } catch (Throwable t) {
+ throw wrapForFuture(t);
+ }
+ }), targetExecutor));
}
/**
@@ -366,11 +387,32 @@ public class AsyncSupport {
return executeAsync(closure, defaultExecutor);
}
+ /**
+ * Go-style task spawn primitive.
+ * <p>
+ * When called inside {@link AsyncScope#withScope(groovy.lang.Closure)},
+ * the new task is registered as a structured child of the current scope.
+ * Outside a scope it behaves like a lightweight detached async task.
+ *
+ * @param closure the task body
+ * @param <T> the result type
+ * @return an awaitable representing the spawned task
+ */
+ public static <T> Awaitable<T> go(Closure<T> closure) {
+ AsyncScope currentScope = AsyncScope.current();
+ if (currentScope != null) {
+ return currentScope.async(closure);
+ }
+ return executeAsync(closure, defaultExecutor);
+ }
+
/**
* Wraps a closure so that each invocation executes the body asynchronously
* and returns an {@link Awaitable}. This is the runtime entry point for
* the {@code async { ... }} expression syntax. The returned closure must
- * be explicitly called to start the async computation:
+ * be explicitly called to start the async computation. Each invocation
+ * captures the caller's {@link AsyncContext} and restores it when the
+ * wrapped closure executes:
* <pre>
* def task = async { expensiveWork() }
* def result = await(task()) // explicit call required
@@ -382,16 +424,19 @@ public class AsyncSupport {
*/
@SuppressWarnings("unchecked")
public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) {
+ Objects.requireNonNull(closure, "closure must not be null");
return new Closure<Awaitable<T>>(closure.getOwner(),
closure.getThisObject()) {
@SuppressWarnings("unused")
public Awaitable<T> doCall(Object... args) {
- return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
- try {
- return closure.call(args);
- } catch (Throwable t) {
- throw wrapForFuture(t);
- }
- }, defaultExecutor));
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ return GroovyPromise.of(CompletableFuture.supplyAsync(() ->
+ AsyncContext.withSnapshot(contextSnapshot, () -> {
+ try {
+ return closure.call(args);
+ } catch (Throwable t) {
+ throw wrapForFuture(t);
+ }
+ }), defaultExecutor));
}
};
}
@@ -401,7 +446,9 @@ public class AsyncSupport {
* so that each invocation returns an {@link AsyncStream} producing the
* yielded elements. This is the runtime entry point for all async
* generator closures ({@code async { yield return ... }}). The returned
- * closure must be explicitly called to start the generation:
+ * closure must be explicitly called to start the generation. As with
+ * ordinary async closures, each invocation captures the caller's
+ * {@link AsyncContext} snapshot:
* <pre>
* def gen = async { yield return 1; yield return 2 }
* for await (item in gen()) { println item }
@@ -413,11 +460,13 @@ public class AsyncSupport {
*/
@SuppressWarnings("unchecked")
public static <T> Closure<AsyncStream<T>> wrapAsyncGenerator(Closure<?>
closure) {
+ Objects.requireNonNull(closure, "closure must not be null");
return new Closure<AsyncStream<T>>(closure.getOwner(),
closure.getThisObject()) {
@SuppressWarnings("unused")
public AsyncStream<T> doCall(Object... args) {
AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
- CompletableFuture.runAsync(() -> {
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ CompletableFuture.runAsync(() ->
AsyncContext.withSnapshot(contextSnapshot, () -> {
gen.attachProducer(Thread.currentThread());
try {
Object[] allArgs = new Object[args.length + 1];
@@ -430,7 +479,7 @@ public class AsyncSupport {
} finally {
gen.detachProducer(Thread.currentThread());
}
- }, defaultExecutor);
+ }), defaultExecutor);
return gen;
}
};
@@ -859,6 +908,9 @@ public class AsyncSupport {
* This method is used internally by {@link groovy.transform.Async @Async}
* generator methods. For the {@code async { yield return ... }} closure
* expression syntax, see {@link #wrapAsyncGenerator(Closure)}.
+ * <p>
+ * The generator body executes with the {@link AsyncContext} snapshot that
+ * was active when generation started.
*
* @param body the generator closure (receives an {@link
AsyncStreamGenerator}
* as its single argument)
@@ -867,8 +919,10 @@ public class AsyncSupport {
*/
@SuppressWarnings("unchecked")
public static <T> AsyncStream<T> generateAsyncStream(Closure<?> body) {
+ Objects.requireNonNull(body, "body must not be null");
AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
- CompletableFuture.runAsync(() -> {
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ CompletableFuture.runAsync(() ->
AsyncContext.withSnapshot(contextSnapshot, () -> {
gen.attachProducer(Thread.currentThread());
try {
body.call(gen);
@@ -878,10 +932,31 @@ public class AsyncSupport {
} finally {
gen.detachProducer(Thread.currentThread());
}
- }, defaultExecutor);
+ }), defaultExecutor);
return gen;
}
+ /**
+ * Creates an unbuffered Go-style channel.
+ *
+ * @param <T> the payload type
+ * @return a new unbuffered channel
+ */
+ public static <T> Channel<T> channel() {
+ return new Channel<>();
+ }
+
+ /**
+ * Creates a Go-style channel with the given buffer capacity.
+ *
+ * @param capacity the channel buffer capacity
+ * @param <T> the payload type
+ * @return a new channel
+ */
+ public static <T> Channel<T> channel(int capacity) {
+ return new Channel<>(capacity);
+ }
+
// ---- executor configuration -----------------------------------------
/**
@@ -934,7 +1009,10 @@ public class AsyncSupport {
* (typically in a {@code finally} block generated by the compiler).
* <p>
* This method is the runtime entry point for the {@code defer { ... }}
- * statement, inspired by Go's {@code defer} keyword.
+ * statement, inspired by Go's {@code defer} keyword. The deferred action
+ * is bound to the current {@link AsyncContext} snapshot so that cleanup
+ * observes the same request/trace metadata that was active when the defer
+ * statement was executed.
*
* @param scope the defer scope (created by {@link #createDeferScope()});
* must not be {@code null}
@@ -951,7 +1029,13 @@ public class AsyncSupport {
if (action == null) {
throw new IllegalArgumentException("defer action must not be
null");
}
- scope.push(action);
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ scope.push(new Closure<Object>(action.getOwner(),
action.getThisObject()) {
+ @SuppressWarnings("unused")
+ public Object doCall() {
+ return AsyncContext.withSnapshot(contextSnapshot, () ->
action.call());
+ }
+ });
}
/**
@@ -960,6 +1044,10 @@ public class AsyncSupport {
* subsequent actions still execute. If multiple actions throw, the
* first exception is rethrown and subsequent ones are added as
* {@linkplain Throwable#addSuppressed(Throwable) suppressed exceptions}.
+ * If a deferred action returns an {@link Awaitable},
+ * {@link CompletionStage}, {@link CompletableFuture}, or {@link Future},
+ * the async cleanup is awaited before moving on to the next deferred
+ * action.
* <p>
* Called by compiler-generated code in the {@code finally} block of
* methods that contain {@code defer} statements.
@@ -973,7 +1061,8 @@ public class AsyncSupport {
Throwable firstError = null;
while (!scope.isEmpty()) {
try {
- scope.pop().call();
+ Object result = scope.pop().call();
+ awaitDeferredResult(result);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
@@ -1041,4 +1130,13 @@ public class AsyncSupport {
if (t instanceof CompletionException ce) return ce;
return new CompletionException(t);
}
+
+ private static void awaitDeferredResult(Object result) {
+ if (result instanceof Awaitable
+ || result instanceof CompletableFuture
+ || result instanceof CompletionStage
+ || result instanceof Future) {
+ await(result);
+ }
+ }
}
diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
index c2ce0d6145..b4bc7f5271 100644
--- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
+++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
@@ -18,6 +18,7 @@
*/
package org.apache.groovy.runtime.async;
+import groovy.concurrent.AsyncContext;
import groovy.concurrent.Awaitable;
import java.util.Objects;
@@ -170,11 +171,15 @@ public class GroovyPromise<T> implements Awaitable<T> {
* {@inheritDoc}
* <p>
* Returns a new {@code GroovyPromise} whose result is obtained by applying
- * the given function to this promise's result.
+ * the given function to this promise's result. The current
+ * {@link AsyncContext} snapshot is captured when the continuation is
+ * registered and restored when it executes.
*/
@Override
public <U> Awaitable<U> then(Function<? super T, ? extends U> fn) {
- return new GroovyPromise<>(future.thenApply(fn));
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ return new GroovyPromise<>(future.thenApply(value ->
+ AsyncContext.withSnapshot(contextSnapshot, () ->
fn.apply(value))));
}
/**
@@ -182,10 +187,15 @@ public class GroovyPromise<T> implements Awaitable<T> {
* <p>
* Returns a new {@code GroovyPromise} that is the result of composing this
* promise with the async function, enabling flat-mapping of awaitables.
+ * The current {@link AsyncContext} snapshot is captured when the
+ * continuation is registered and restored when it executes.
*/
@Override
public <U> Awaitable<U> thenCompose(Function<? super T, ? extends
Awaitable<U>> fn) {
- return new GroovyPromise<>(future.thenCompose(t ->
fn.apply(t).toCompletableFuture()));
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ return new GroovyPromise<>(future.thenCompose(value ->
+ AsyncContext.withSnapshot(contextSnapshot,
+ () -> fn.apply(value).toCompletableFuture())));
}
/**
@@ -194,14 +204,17 @@ public class GroovyPromise<T> implements Awaitable<T> {
* Returns a new {@code GroovyPromise} that handles exceptions thrown by
this promise.
* The throwable passed to the handler is deeply unwrapped to strip JDK
* wrapper layers ({@code CompletionException}, {@code
ExecutionException}).
+ * The handler runs with the {@link AsyncContext} snapshot that was active
+ * when the recovery continuation was registered.
*/
@Override
public Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn) {
- return new GroovyPromise<>(future.exceptionally(t -> {
- // Unwrap all wrapper layers so handler sees the original exception
- Throwable cause = AsyncSupport.deepUnwrap(t);
- return fn.apply(cause);
- }));
+ AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+ return new GroovyPromise<>(future.exceptionally(t ->
+ AsyncContext.withSnapshot(contextSnapshot, () -> {
+ Throwable cause = AsyncSupport.deepUnwrap(t);
+ return fn.apply(cause);
+ })));
}
/**
diff --git
a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
index 5edd5f9e15..50aab9776a 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
@@ -53,9 +53,13 @@ import static
org.codehaus.groovy.ast.tools.GenericsUtils.makeClassSafeWithGener
* the runtime's {@code await()} via {@link AsyncTransformHelper}</li>
* <li>The method body is executed asynchronously — or as an async generator
* if it contains {@code yield return} — via factory methods on
- * {@link AsyncTransformHelper}</li>
+ * {@link AsyncTransformHelper}. The runtime preserves
+ * {@link groovy.concurrent.AsyncContext} snapshots across those async
+ * boundaries.</li>
* <li>Methods containing {@code defer} statements are wrapped in a
- * try-finally block that executes deferred actions in LIFO order</li>
+ * try-finally block that executes deferred actions in LIFO order, with
+ * each deferred closure bound to the async context active at
registration
+ * time</li>
* <li>The return type becomes {@code Awaitable<T>}
* (or {@code AsyncStream<T>} for generators)</li>
* </ol>
diff --git
a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
index 6c07699678..8f0d137c16 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
@@ -71,7 +71,10 @@ import static
org.codehaus.groovy.ast.tools.GeneralUtils.varX;
* <b>Rewriting utilities</b>
* ({@link #injectGenParamIntoYieldReturnCalls(Statement, Parameter)},
* {@link #wrapWithDeferScope(Statement)}) perform targeted AST mutations
- * that transform high-level syntax into the lower-level runtime calls.
+ * that transform high-level syntax into the lower-level runtime calls. The
+ * generated runtime calls now also inherit async-local context automatically,
+ * so compiler-generated code retains request/trace metadata across thread hops
+ * without special-case AST handling.
*
* @see AsyncASTTransformation
* @since 6.0.0
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index 2d5d1f0771..adca9476f3 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -39,6 +39,8 @@ Key capabilities include:
* **`for await`** — iterate over asynchronous data sources
* **`yield return`** — produce asynchronous streams (generators)
* **`defer`** — schedule Go-style cleanup actions that run when the method
completes
+* **`AsyncContext`** — propagate request, trace, and tenant metadata across
async boundaries
+* **`Awaitable.go`, `Channel`, and `Awaitable.any`** — Go-inspired task
spawning and channel-based communication primitives
* **Framework integration** — built-in adapters for `CompletableFuture` and
`Future`;
`Flow.Publisher` support via an auto-discovered runtime adapter;
extensible to RxJava, Reactor, and Spring via the adapter registry
@@ -344,9 +346,10 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=async_lambda,indent=0]
----
[[for-await]]
+[[for_await_section]]
== `for await` — Async Iteration
-The `for await` loop iterates over asynchronous data sources (`AsyncStream`,
`Flow.Publisher`, `Iterable`).
+The `for await` loop iterates over asynchronous data sources (`AsyncStream`,
`Flow.Publisher`, `Channel`, `Iterable`).
Each element is resolved asynchronously before the loop body executes. This is
analogous to JavaScript's
`for await...of` and C#'s `await foreach`.
@@ -476,6 +479,19 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=defer_exception,indent=0]
include::../test/AsyncAwaitSpecTest.groovy[tags=defer_resource_cleanup,indent=0]
----
+=== Async Cleanup in Deferred Blocks
+
+Deferred blocks may also return an `Awaitable`, `CompletionStage`, or `Future`.
+Groovy waits for that asynchronous cleanup to finish before the enclosing async
+method completes, which makes `defer` practical for non-blocking shutdown paths
+such as flushing telemetry, closing async streams, or returning buffered work
+to a channel:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=defer_async_cleanup,indent=0]
+----
+
[TIP]
====
When an exception occurs in a deferred block and the enclosing method has also
thrown,
@@ -666,7 +682,7 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=from_conversion,indent=0]
`Awaitable.from()` accepts any type supported by a registered adapter:
`CompletableFuture`,
`CompletionStage`, `Future`, `Flow.Publisher`, or any third-party type with a
custom adapter.
If the source is already an `Awaitable`, it is returned as-is.
`AsyncStream.from()` works
-similarly for multi-value sources such as `Iterable`, `Iterator`, or
`Flow.Publisher`.
+similarly for multi-value sources such as `Iterable`, `Iterator`, `Channel`,
or `Flow.Publisher`.
=== Built-in Adapters
@@ -676,6 +692,7 @@ asynchronous types from third-party frameworks. The
built-in adapter handles:
* `groovy.concurrent.Awaitable` (native Groovy promise — passthrough)
* `java.util.concurrent.CompletableFuture` and `CompletionStage`
* `java.util.concurrent.Future` (blocking, delegated to a configurable
executor)
+* `groovy.concurrent.Channel` (automatically converted to `AsyncStream` for
`for await`)
Additionally, `java.util.concurrent.Flow.Publisher` support (single-value
`await` and
multi-value `for await`) is provided by an internal runtime adapter
@@ -1095,12 +1112,24 @@ JavaScript, C#, Kotlin, and Swift, for developers
familiar with those languages.
| `Task { body }`
| **Async iteration**
-| `for await (x in src) { }`
+| `for await (x in src) { }` +
+_(supports `AsyncStream`, `Flow.Publisher`, `Channel`, `Iterable`)_
| `for await (x of src) { }`
| `await foreach (x in src) { }`
| _(manual via `Flow`, `Channel`, or `Flow.collect`)_
| `for try await x in seq`
+| **Channels**
+| `Awaitable.channel(n)` +
+`await ch.send(x)` / `await ch.receive()` +
+`for await (x in ch) { }`
+| _(none; use streams or message ports)_
+| `System.Threading.Channels` +
+`Channel<T>` / `ChannelReader.ReadAllAsync()`
+| `Channel<T>` / `channel.send()` +
+`for (x in channel) { }`
+| `AsyncStream` / `AsyncChannel` (3rd party)
+
| **Async generator**
| `yield return expr`
| `yield expr` (in `async function*`)
@@ -1289,6 +1318,195 @@ Always close a scope. An unclosed scope leaves child
tasks in an indeterminate
and may leak threads.
====
+[[async-context]]
+== Async Execution Context with `AsyncContext`
+
+Asynchronous code frequently needs more than just a return value: it must also
+carry request IDs, tenant IDs, security information, locale overrides, or
+trace metadata through thread hops. `AsyncContext` provides a small, explicit,
+async-local map for exactly that purpose.
+
+Groovy captures the current `AsyncContext` when a task or continuation is
+scheduled and restores that snapshot when it runs. This gives you the most
+useful production invariant:
+
+* child tasks inherit the parent context as it existed at spawn time;
+* child-side mutations do not leak back into the parent;
+* continuations (`.then()`, `.exceptionally()`, `.handle()`, etc.) see the
+ context that was active when they were registered.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_context_basic,indent=0]
+----
+
+=== Snapshots and Manual Restoration
+
+`AsyncContext.capture()` returns an immutable `Snapshot` of the current
context.
+You can restore that snapshot later—even from a different thread—using
+`AsyncContext.withSnapshot(snapshot) { ... }`. Snapshots are lightweight: they
+are simply frozen copies of the context map at capture time.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_context_snapshot,indent=0]
+----
+
+[TIP]
+====
+Use `AsyncContext` for small pieces of execution metadata — correlation IDs,
+tenant IDs, principals, feature flags. Keep large mutable business objects in
+ordinary method parameters or return values.
+====
+
+[NOTE]
+====
+`AsyncContext` is backed by a `ThreadLocal`. In pooled-thread environments the
+context is automatically captured and restored by the async runtime, but if you
+create your own threads you must capture a snapshot beforehand and restore it
+manually via `AsyncContext.withSnapshot(snapshot) { ... }`.
+====
+
+[[go-tasks-and-channels]]
+== Go-Style Tasks and Channels
+
+Groovy's core async model is based on `Awaitable`, but it also provides
+Go-inspired coordination primitives for workflows that benefit from explicit
+message passing.
+
+=== Spawning Tasks with `go`
+
+`Awaitable.go { ... }` starts a lightweight task. Outside an `AsyncScope` it
+behaves like a detached async job. Inside an `AsyncScope`, it automatically
+joins the current scope so the task is awaited and cancelled with its siblings.
+
+=== Channels
+
+`Awaitable.channel()` creates an unbuffered rendezvous channel;
`Awaitable.channel(n)`
+creates a buffered channel with capacity `n`. Both `send()` and `receive()`
+return `Awaitable` values, which means they compose naturally with `await` and
+with `Awaitable.any(...)` for first-completion-wins logic.
+
+Key channel properties:
+
+* **Unbuffered (capacity 0)**: `send()` suspends until a matching `receive()`
+ arrives, providing direct handoff rendezvous semantics.
+* **Buffered (capacity > 0)**: `send()` completes immediately if buffer space
is
+ available; otherwise it suspends until a consumer drains an element.
+* **Close semantics**: closing a channel is idempotent. Buffered values remain
+ receivable after close. Once drained, further receives fail with
+ `ChannelClosedException`. Pending senders fail immediately on close.
+* **Null rejection**: channels reject `null` payloads to maintain a clean
+ distinction between "no value yet" and actual data.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=go_channel_select,indent=0]
+----
+
+=== Racing Channel Operations
+
+Use `Awaitable.any(...)` to race channel receives against timeouts or other
+channels — the first completed source wins:
+
+[source,groovy]
+----
+def value = await Awaitable.any(
+ ch.receive(), // channel receive
+ Awaitable.delay(1000).then { 'timeout' } // timeout fallback
+)
+----
+
+This is the Groovy equivalent of Go's `select` statement. Because
`Awaitable.any`
+accepts any awaitable source, it works seamlessly with channels, timers, and
+arbitrary async tasks.
+
+=== Iterating Channels with `for await`
+
+Because `Channel` integrates with the <<for_await_section,`for await`>> syntax,
+you can iterate received values in a natural loop — the Groovy equivalent of
Go's
+`for range ch`:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=for_await_channel,indent=0]
+----
+
+Internally, `for await` obtains a read-only `AsyncStream` view from the channel
+(via `Channel.asStream()`). Each iteration calls `receive()`; when the
channel is
+closed and all buffered values are drained, the stream signals completion and
the
+loop exits.
+
+[IMPORTANT]
+====
+The stream view does **not** own the channel. Exiting the loop — whether
+normally, via `break`, or on exception — does _not_ close the channel. The
+producer is responsible for closing the channel when it has finished sending.
+This matches Go's convention that only the _sender_ closes a channel.
+====
+
+=== Fan-In Pipelines
+
+Channels are particularly effective for fan-in and fan-out pipelines where
+multiple producers feed a shared channel and one or more consumers drain it:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=channel_fan_in,indent=0]
+----
+
+This pattern is useful for work distribution, aggregating results from
+parallel workers, and building actor-like coordination without explicit locks.
+
+=== Channels vs `Flow.Publisher`
+
+Channels and `Flow.Publisher` both carry asynchronous data, and both work with
+<<for_await_section,`for await`>>. However, they serve **different purposes**
and
+should _not_ be confused or substituted for each other:
+
+[cols="1,2,2"]
+|===
+| Aspect | Channel | `Flow.Publisher`
+
+| **Model**
+| Bidirectional send/receive (CSP)
+| Unidirectional push (Reactive Streams)
+
+| **Backpressure**
+| Implicit — sender waits when the buffer is full
+| Explicit — subscriber signals demand via `request(n)`
+
+| **Direction**
+| Symmetric — both sides actively write and read
+| Asymmetric — publisher pushes, subscriber pulls demand
+
+| **Close semantics**
+| Idempotent; buffered values survive close and drain first
+| Terminal `onComplete`/`onError` signals after all items
+
+| **Multi-consumer**
+| Competing — each `receive()` is exclusive (only one consumer gets the value)
+| Copied — each subscriber gets its own stream of items
+
+| **Best for**
+| Task-to-task coordination, fan-in/fan-out pipelines
+| Consuming reactive streams, HTTP bodies, database cursors
+|===
+
+Despite these differences, **both are consumed through the same `for await`
syntax**
+via the unified `AsyncStream` abstraction:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=channel_vs_flow_for_await,indent=0]
+----
+
+TIP: Use **channels** when you control both the producer and consumer and need
+explicit coordination (e.g., work queues, pipelines, actor-style messaging).
+Use **`Flow.Publisher`** when you receive a push-based data stream from an
+external source (e.g., a reactive HTTP client, a message broker, or a database
+cursor).
+
[[performance-notes]]
== Performance Characteristics
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy
b/src/spec/test/AsyncAwaitSpecTest.groovy
index 565fc35e91..a842318ef7 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -1529,4 +1529,248 @@ scope.close() // waits for all children, idempotent
// end::async_scope_manual[]
'''
}
+
+ @Test
+ void testAsyncContextPropagation() {
+ assertScript '''
+// tag::async_context_basic[]
+import groovy.concurrent.AsyncContext
+import groovy.concurrent.Awaitable
+
+AsyncContext.with([traceId: 'req-7', tenant: 'acme']) {
+ def task = Awaitable.go {
+ assert AsyncContext.current()['traceId'] == 'req-7'
+ AsyncContext.current()['traceId'] = 'child-trace'
+ [trace: AsyncContext.current()['traceId'], tenant:
AsyncContext.current()['tenant']]
+ }
+
+ def child = await task
+ assert child == [trace: 'child-trace', tenant: 'acme']
+ assert AsyncContext.current()['traceId'] == 'req-7'
+}
+// end::async_context_basic[]
+ '''
+ }
+
+ @Test
+ void testGoChannelSelect() {
+ assertScript '''
+// tag::go_channel_select[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+def result = AsyncScope.withScope { scope ->
+ def ch = Awaitable.channel(1)
+
+ def producer = Awaitable.go {
+ await ch.send('payload')
+ 'sent'
+ }
+
+ def consumer = scope.async {
+ await ch.receive()
+ }
+
+ def value = await Awaitable.any(
+ consumer,
+ Awaitable.delay(250).then { 'timeout' }
+ )
+
+ [value: value, producer: await(producer), childCount: scope.childCount]
+}
+
+assert result == [value: 'payload', producer: 'sent', childCount: 2]
+// end::go_channel_select[]
+ '''
+ }
+
+ @Test
+ void testDeferAsyncCleanup() {
+ assertScript '''
+// tag::defer_async_cleanup[]
+import groovy.concurrent.Awaitable
+
+class ResourceHolder {
+ final List<String> log = []
+
+ Awaitable<Void> closeAsync(String name) {
+ Awaitable.go {
+ await Awaitable.delay(10)
+ log << "close:${name}"
+ null
+ }
+ }
+
+ async useResource() {
+ defer { closeAsync('outer') }
+ defer { closeAsync('inner') }
+ log << 'body'
+ return log
+ }
+}
+
+def holder = new ResourceHolder()
+assert await(holder.useResource()) == ['body', 'close:inner', 'close:outer']
+// end::defer_async_cleanup[]
+ '''
+ }
+
+ @Test
+ void testChannelFanInPipeline() {
+ assertScript '''
+// tag::channel_fan_in[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+def results = AsyncScope.withScope { scope ->
+ def ch = Awaitable.channel(8)
+
+ // Fan-out: multiple producers write to a shared channel
+ 3.times { producerId ->
+ scope.async {
+ for (int i = 0; i < 4; i++) {
+ await ch.send("p${producerId}-item${i}")
+ }
+ null
+ }
+ }
+
+ // Fan-in: single consumer drains the channel
+ def consumer = scope.async {
+ def items = []
+ 12.times { items << await(ch.receive()) }
+ items.sort()
+ }
+
+ await consumer
+}
+
+assert results.size() == 12
+assert results.count { it.startsWith('p0-') } == 4
+assert results.count { it.startsWith('p1-') } == 4
+assert results.count { it.startsWith('p2-') } == 4
+// end::channel_fan_in[]
+ '''
+ }
+
+ @Test
+ void testAsyncContextSnapshot() {
+ assertScript '''
+// tag::async_context_snapshot[]
+import groovy.concurrent.AsyncContext
+import groovy.concurrent.Awaitable
+
+// Capture a snapshot to hand off to a different async scope
+AsyncContext.current()['requestId'] = 'req-99'
+def snapshot = AsyncContext.capture()
+
+// Mutate the parent context — the snapshot is unaffected
+AsyncContext.current()['requestId'] = 'req-100'
+
+def childResult = await Awaitable.go {
+ // Child sees the parent's captured state, not the earlier snapshot
+ AsyncContext.current()['requestId']
+}
+
+assert childResult == 'req-100' // go captures at call time
+
+// Manually restore snapshot in a different scope
+def restored = AsyncContext.withSnapshot(snapshot, { ->
+ AsyncContext.current()['requestId']
+} as java.util.function.Supplier)
+
+assert restored == 'req-99'
+// end::async_context_snapshot[]
+ '''
+ }
+
+ @Test
+ void testForAwaitOverChannel() {
+ assertScript '''
+// tag::for_await_channel[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+def items = []
+def ch = Awaitable.channel(4)
+
+AsyncScope.withScope { scope ->
+ // Producer sends values and closes the channel when done
+ scope.async {
+ for (word in ['hello', 'async', 'world']) {
+ await ch.send(word)
+ }
+ ch.close()
+ null
+ }
+
+ // Consumer iterates the channel with for-await
+ for await (item in ch) {
+ items << item
+ }
+}
+
+assert items == ['hello', 'async', 'world']
+// end::for_await_channel[]
+ '''
+ }
+
+ @Test
+ void testChannelVsFlowPublisherForAwait() {
+ assertScript '''
+// tag::channel_vs_flow_for_await[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+import java.util.concurrent.Flow
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+// --- Channel: Go-style coordination primitive ---
+// Producers and consumers communicate through explicit send/receive.
+// Best for task-to-task coordination and fan-in/fan-out pipelines.
+def channelItems = []
+def ch = Awaitable.channel(3)
+
+AsyncScope.withScope { scope ->
+ scope.async {
+ for (n in [10, 20, 30]) { await ch.send(n) }
+ ch.close()
+ null
+ }
+ for await (item in ch) {
+ channelItems << item
+ }
+}
+assert channelItems == [10, 20, 30]
+
+// --- Flow.Publisher: reactive data stream ---
+// A push-based source adapted automatically for for-await consumption.
+// Best for consuming reactive streams, HTTP responses, and database cursors.
+def publisherItems = []
+
+def publisher = new Flow.Publisher<Integer>() {
+ void subscribe(Flow.Subscriber<? super Integer> subscriber) {
+ subscriber.onSubscribe(new Flow.Subscription() {
+ int count = 0
+ void request(long n) {
+ for (int i = 0; i < n && count < 3; i++) {
+ count++
+ subscriber.onNext(count * 100)
+ }
+ if (count >= 3) subscriber.onComplete()
+ }
+ void cancel() {}
+ })
+ }
+}
+for await (item in publisher) {
+ publisherItems << item
+}
+assert publisherItems == [100, 200, 300]
+
+// Both Channel and Flow.Publisher work with 'for await' through
+// the unified AsyncStream abstraction — same syntax, different use cases.
+// end::channel_vs_flow_for_await[]
+ '''
+ }
}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncContextChannelTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncContextChannelTest.groovy
new file mode 100644
index 0000000000..d6c9515236
--- /dev/null
+++
b/src/test/groovy/org/codehaus/groovy/transform/AsyncContextChannelTest.groovy
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform
+
+import org.junit.jupiter.api.Test
+
+import static groovy.test.GroovyAssert.assertScript
+import static groovy.test.GroovyAssert.shouldFail
+
+/**
+ * Integration tests for async context propagation, channels, and structured
+ * concurrency using Groovy's async/await syntax. All tests use
+ * {@link groovy.test.GroovyAssert#assertScript assertScript} so the code
+ * under test exercises the full Groovy compiler pipeline including the
+ * {@code async}/{@code await} AST transformation.
+ */
+final class AsyncContextChannelTest {
+
+ // ---- AsyncContext propagation ----
+
+ @Test
+ void testAsyncContextPropagatesAndIsIsolatedAcrossTasks() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['traceId'] = 'parent-trace'
+
+ def child = Awaitable.go {
+ assert AsyncContext.current()['traceId'] == 'parent-trace'
+ AsyncContext.current()['traceId'] = 'child-trace'
+ AsyncContext.current()['traceId']
+ }
+
+ assert await(child) == 'child-trace'
+ assert AsyncContext.current()['traceId'] == 'parent-trace'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testPromiseContinuationsCaptureAsyncContext() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['requestId'] = 'request-1'
+
+ def source = Awaitable.go {
+ await Awaitable.delay(25)
+ 40
+ }
+
+ def continuation = source.then { value ->
+ assert AsyncContext.current()['requestId'] == 'request-1'
+ value + 2
+ }
+
+ AsyncContext.current()['requestId'] = 'request-2'
+
+ assert await(continuation) == 42
+ assert AsyncContext.current()['requestId'] == 'request-2'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextPreservedAcrossContinuationChains() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['traceId'] = 'trace-chain'
+
+ def result = Awaitable.go { 10 }
+ .then { it * 2 }
+ .then { value ->
+ assert AsyncContext.current()['traceId'] == 'trace-chain'
+ value + 1
+ }
+ .thenCompose { value ->
+ assert AsyncContext.current()['traceId'] == 'trace-chain'
+ Awaitable.of(value * 3)
+ }
+ .handle { value, error ->
+ assert AsyncContext.current()['traceId'] == 'trace-chain'
+ value
+ }
+
+ assert await(result) == 63
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextInExceptionallyHandler() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['reqId'] = 'err-test'
+
+ def result = Awaitable.go { throw new RuntimeException('fail') }
+ .exceptionally { error ->
+ assert AsyncContext.current()['reqId'] == 'err-test'
+ "recovered: ${error.message}"
+ }
+
+ assert await(result) == 'recovered: fail'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextInWhenCompleteHandler() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+ import java.util.concurrent.TimeUnit
+
+ AsyncContext.current()['traceId'] = 'wc-test'
+ def observed = new CompletableFuture<String>()
+
+ def task = Awaitable.go { 42 }
+ .whenComplete { value, error ->
+ observed.complete(AsyncContext.current()['traceId'] as
String)
+ }
+
+ await task
+ assert observed.get(2, TimeUnit.SECONDS) == 'wc-test'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextIsolationUnderConcurrentMutations() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ int taskCount = 100
+ AsyncContext.current()['shared'] = 'root'
+
+ def tasks = (0..<taskCount).collect { n ->
+ Awaitable.go {
+ assert AsyncContext.current()['shared'] == 'root'
+ AsyncContext.current()['taskId'] = "task-${n}".toString()
+ await Awaitable.delay(5)
+ assert AsyncContext.current()['taskId'] ==
"task-${n}".toString()
+ assert AsyncContext.current()['shared'] == 'root'
+ AsyncContext.current()['taskId']
+ }
+ }
+
+ def results = await Awaitable.all(*tasks)
+ assert results.toSet().size() == taskCount
+ assert AsyncContext.current()['shared'] == 'root'
+ assert !AsyncContext.current().containsKey('taskId')
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextSnapshotIsImmutableAfterCapture() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['key'] = 'before'
+ def snapshot = AsyncContext.capture()
+
+ AsyncContext.current()['key'] = 'after'
+ AsyncContext.current()['extra'] = 'new'
+
+ assert snapshot.get('key') == 'before'
+ assert !snapshot.containsKey('extra')
+
+ def result = Awaitable.go { AsyncContext.current()['key'] }
+ assert await(result) == 'after'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextWithOverlayAndRestore() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+
+ AsyncContext.current()['env'] = 'production'
+ AsyncContext.current()['user'] = 'admin'
+
+ def result = AsyncContext.with([env: 'staging', feature: 'beta']) {
+ assert AsyncContext.current()['env'] == 'staging'
+ assert AsyncContext.current()['user'] == 'admin'
+ assert AsyncContext.current()['feature'] == 'beta'
+ 'inside'
+ }
+
+ assert result == 'inside'
+ assert AsyncContext.current()['env'] == 'production'
+ assert AsyncContext.current()['user'] == 'admin'
+ assert !AsyncContext.current().containsKey('feature')
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextWithNullValueRemovesKey() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+
+ AsyncContext.current()['key'] = 'value'
+ assert AsyncContext.current().containsKey('key')
+ AsyncContext.current()['key'] = null
+ assert !AsyncContext.current().containsKey('key')
+ '''
+ }
+
+ @Test
+ void testAsyncContextSnapshotWith() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+
+ AsyncContext.current()['a'] = '1'
+ AsyncContext.current()['b'] = '2'
+ def snapshot = AsyncContext.capture()
+
+ def merged = snapshot.with([b: '3', c: '4'])
+ assert merged.get('a') == '1'
+ assert merged.get('b') == '3'
+ assert merged.get('c') == '4'
+
+ assert snapshot.get('b') == '2'
+ assert !snapshot.containsKey('c')
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextSnapshotWithNullRemovesKey() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+
+ AsyncContext.current()['a'] = '1'
+ AsyncContext.current()['b'] = '2'
+ def snapshot = AsyncContext.capture()
+
+ def merged = snapshot.with([b: null])
+ assert merged.get('a') == '1'
+ assert !merged.containsKey('b')
+ assert merged.size() == 1
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testAsyncContextScopeRestoresOnException() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+
+ AsyncContext.current()['key'] = 'original'
+
+ try {
+ AsyncContext.with([key: 'overlay']) {
+ assert AsyncContext.current()['key'] == 'overlay'
+ throw new RuntimeException('boom')
+ }
+ } catch (RuntimeException ignored) {}
+
+ assert AsyncContext.current()['key'] == 'original'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ // ---- Structured concurrency (AsyncScope) ----
+
+ @Test
+ void testGoJoinsCurrentScope() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ def result = AsyncScope.withScope { scope ->
+ def task = Awaitable.go { 42 }
+ [value: await(task), childCount: scope.childCount]
+ }
+
+ assert result.value == 42
+ assert result.childCount == 1
+ '''
+ }
+
+ @Test
+ void testGoOutsideScopeBehavesAsDetachedTask() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ assert AsyncScope.current() == null
+ def result = await Awaitable.go { 'detached' }
+ assert result == 'detached'
+ '''
+ }
+
+ @Test
+ void testGoInsideScopeRegistersAsChild() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ def result = AsyncScope.withScope { scope ->
+ Awaitable.go { 'child-1' }
+ Awaitable.go { 'child-2' }
+ scope.childCount
+ }
+ assert result == 2
+ '''
+ }
+
+ @Test
+ void testAsyncContextPropagatedIntoScopeChildren() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.AsyncScope
+
+ AsyncContext.current()['scopeCtx'] = 'scoped-value'
+
+ def result = AsyncScope.withScope { scope ->
+ def child = scope.async { AsyncContext.current()['scopeCtx'] }
+ await child
+ }
+
+ assert result == 'scoped-value'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ // ---- Channel tests ----
+
+ @Test
+ void testUnbufferedChannelBackPressuresSenderUntilReceiverArrives() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CountDownLatch
+ import java.util.concurrent.TimeUnit
+
+ def channel = Awaitable.channel()
+ def senderStarted = new CountDownLatch(1)
+
+ def sendTask = Awaitable.go {
+ senderStarted.countDown()
+ await channel.send('payload')
+ 'sent'
+ }
+
+ assert senderStarted.await(2, TimeUnit.SECONDS)
+ Thread.sleep(50)
+ assert !sendTask.done
+
+ assert await(channel.receive()) == 'payload'
+ assert await(sendTask) == 'sent'
+ '''
+ }
+
+ @Test
+ void testBufferedChannelSendCompletesImmediately() {
+ assertScript '''
+ import groovy.concurrent.Channel
+
+ def channel = new Channel<String>(2)
+
+ def s1 = channel.send('a')
+ def s2 = channel.send('b')
+
+ assert s1.done
+ assert s2.done
+ assert channel.bufferedSize == 2
+
+ assert await(channel.receive()) == 'a'
+ assert await(channel.receive()) == 'b'
+ '''
+ }
+
+ @Test
+ void testUnbufferedChannelRendezvous() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CountDownLatch
+ import java.util.concurrent.TimeUnit
+
+ def channel = Awaitable.channel()
+ def senderReady = new CountDownLatch(1)
+ def receiverReady = new CountDownLatch(1)
+
+ def sender = Awaitable.go {
+ senderReady.countDown()
+ receiverReady.await(2, TimeUnit.SECONDS)
+ await channel.send('rendezvous')
+ 'sent'
+ }
+
+ def receiver = Awaitable.go {
+ senderReady.await(2, TimeUnit.SECONDS)
+ receiverReady.countDown()
+ await channel.receive()
+ }
+
+ assert await(receiver) == 'rendezvous'
+ assert await(sender) == 'sent'
+ '''
+ }
+
+ @Test
+ void testChannelCloseFailsReceiversAfterDrain() {
+ shouldFail '''
+ import groovy.concurrent.Channel
+ import groovy.concurrent.ChannelClosedException
+
+ def channel = new Channel<String>(1)
+ await channel.send('first')
+ assert channel.close()
+ assert await(channel.receive()) == 'first'
+ await channel.receive() // should throw ChannelClosedException
+ '''
+ }
+
+ @Test
+ void testChannelRejectsNullPayload() {
+ shouldFail '''
+ import groovy.concurrent.Channel
+ new Channel<String>(1).send(null)
+ '''
+ }
+
+ @Test
+ void testChannelRejectsNegativeCapacity() {
+ shouldFail '''
+ import groovy.concurrent.Channel
+ new Channel<String>(-1)
+ '''
+ }
+
+ @Test
+ void testChannelCloseIsIdempotent() {
+ assertScript '''
+ import groovy.concurrent.Channel
+
+ def channel = new Channel<String>(1)
+ assert channel.close()
+ assert !channel.close()
+ '''
+ }
+
+ @Test
+ void testChannelSendOnClosedChannel() {
+ shouldFail '''
+ import groovy.concurrent.Channel
+ import groovy.concurrent.ChannelClosedException
+
+ def channel = new Channel<String>(1)
+ channel.close()
+ await channel.send('late') // should throw ChannelClosedException
+ '''
+ }
+
+ @Test
+ void testChannelClosePreservesBufferedValues() {
+ assertScript '''
+ import groovy.concurrent.Channel
+ import static groovy.test.GroovyAssert.shouldFail
+
+ def channel = new Channel<Integer>(5)
+
+ await channel.send(1)
+ await channel.send(2)
+ await channel.send(3)
+ channel.close()
+
+ assert await(channel.receive()) == 1
+ assert await(channel.receive()) == 2
+ assert await(channel.receive()) == 3
+
+ shouldFail(groovy.concurrent.ChannelClosedException) {
+ await channel.receive()
+ }
+ '''
+ }
+
+ @Test
+ void testChannelCloseDrainsBufferToWaitingReceivers() {
+ assertScript '''
+ import groovy.concurrent.Channel
+
+ def channel = new Channel<String>(2)
+ await channel.send('a')
+ await channel.send('b')
+
+ def r1 = channel.receive()
+ def r2 = channel.receive()
+
+ channel.close()
+
+ assert await(r1) == 'a'
+ assert await(r2) == 'b'
+ '''
+ }
+
+ @Test
+ void testChannelToString() {
+ assertScript '''
+ import groovy.concurrent.Channel
+
+ def channel = new Channel<String>(5)
+ await channel.send('a')
+ def str = channel.toString()
+ assert str.contains('capacity=5')
+ assert str.contains('buffered=1')
+ assert str.contains('closed=false')
+ '''
+ }
+
+ @Test
+ void testChannelPendingSendersCancelledOnClose() {
+ shouldFail '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CountDownLatch
+ import java.util.concurrent.TimeUnit
+
+ def channel = Awaitable.channel() // unbuffered
+ def senderStarted = new CountDownLatch(1)
+
+ def sendTask = Awaitable.go {
+ senderStarted.countDown()
+ await channel.send('will-fail')
+ }
+
+ assert senderStarted.await(2, TimeUnit.SECONDS)
+ Thread.sleep(50)
+ channel.close()
+
+ await sendTask // should throw ChannelClosedException
+ '''
+ }
+
+ @Test
+ void testChannelPendingReceiversFailOnClose() {
+ shouldFail '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CountDownLatch
+ import java.util.concurrent.TimeUnit
+
+ def channel = Awaitable.channel()
+ def receiverStarted = new CountDownLatch(1)
+
+ def receiveTask = Awaitable.go {
+ receiverStarted.countDown()
+ await channel.receive()
+ }
+
+ assert receiverStarted.await(2, TimeUnit.SECONDS)
+ Thread.sleep(50)
+ channel.close()
+
+ await receiveTask // should throw ChannelClosedException
+ '''
+ }
+
+ @Test
+ void testAnyRacesChannelReceiveAgainstTimeout() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ def channel = Awaitable.channel(1)
+
+ // Timeout should win since channel has no data
+ def result = await Awaitable.any(
+ channel.receive(),
+ Awaitable.delay(50).then { 'timeout' }
+ )
+
+ assert result == 'timeout'
+ '''
+ }
+
+ @Test
+ void testAnyWithMultipleChannels() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import groovy.concurrent.Channel
+
+ def ch1 = new Channel<String>(1)
+ def ch2 = new Channel<String>(1)
+
+ await ch2.send('from-ch2')
+
+ def result = await Awaitable.any(
+ ch1.receive(),
+ ch2.receive()
+ )
+
+ assert result == 'from-ch2'
+ '''
+ }
+
+ // ---- High-concurrency tests ----
+
+ @Test
+ void testBufferedChannelHighConcurrencyFanIn() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ int producerCount = 200
+ def channel = Awaitable.channel(32)
+
+ def sum = AsyncScope.withScope { scope ->
+ def senders = (0..<producerCount).collect { n ->
+ Awaitable.go {
+ await channel.send(n)
+ null
+ }
+ }
+ def consumer = scope.async {
+ int total = 0
+ for (int i = 0; i < producerCount; i++) {
+ total += await(channel.receive())
+ }
+ total
+ }
+
+ await Awaitable.all(*senders)
+ await consumer
+ }
+
+ assert sum == (producerCount * (producerCount - 1)) / 2
+ '''
+ }
+
+ @Test
+ void testHighConcurrencyMultiProducerMultiConsumer() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CopyOnWriteArrayList
+
+ int producerCount = 50
+ int messagesPerProducer = 20
+ int totalMessages = producerCount * messagesPerProducer
+ def channel = Awaitable.channel(64)
+ def received = new CopyOnWriteArrayList<Integer>()
+
+ AsyncScope.withScope { scope ->
+ (0..<producerCount).each { p ->
+ scope.async {
+ for (int i = 0; i < messagesPerProducer; i++) {
+ await channel.send(p * messagesPerProducer + i)
+ }
+ null
+ }
+ }
+
+ int consumerCount = 10
+ int messagesPerConsumer = totalMessages / consumerCount
+ (0..<consumerCount).each { c ->
+ scope.async {
+ for (int i = 0; i < messagesPerConsumer; i++) {
+ received.add(await channel.receive())
+ }
+ null
+ }
+ }
+ }
+
+ assert received.size() == totalMessages
+ assert received.toSet().size() == totalMessages
+ '''
+ }
+
+ @Test
+ void testHighConcurrencyUnbufferedChannelStressTest() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicInteger
+
+ int pairCount = 100
+ def channel = Awaitable.channel()
+ def sum = new AtomicInteger(0)
+
+ AsyncScope.withScope { scope ->
+ (0..<pairCount).each { n ->
+ scope.async {
+ await channel.send(n)
+ null
+ }
+ scope.async {
+ sum.addAndGet(await(channel.receive()) as int)
+ null
+ }
+ }
+ }
+
+ assert sum.get() == (pairCount * (pairCount - 1)) / 2
+ '''
+ }
+
+ @Test
+ void testScopeFailFastCancellsSiblings() {
+ shouldFail '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ AsyncScope.withScope { scope ->
+ scope.async {
+ await Awaitable.delay(10)
+ throw new RuntimeException('boom')
+ }
+ scope.async {
+ await Awaitable.delay(5000)
+ 'should be cancelled'
+ }
+ }
+ '''
+ }
+
+ @Test
+ void testScopeClosedRejectsNewTasks() {
+ shouldFail '''
+ import groovy.concurrent.AsyncScope
+
+ def scope = new AsyncScope()
+ scope.close()
+ scope.async { 'too late' }
+ '''
+ }
+
+ @Test
+ void testAsyncContextCapturedAtGoCallTime() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['phase'] = 'A'
+ def task1 = Awaitable.go {
+ await Awaitable.delay(30)
+ AsyncContext.current()['phase']
+ }
+
+ AsyncContext.current()['phase'] = 'B'
+ def task2 = Awaitable.go {
+ await Awaitable.delay(10)
+ AsyncContext.current()['phase']
+ }
+
+ assert await(task1) == 'A'
+ assert await(task2) == 'B'
+ AsyncContext.current().clear()
+ '''
+ }
+
+ // ---- for-await over channels ----
+
+ @Test
+ void testForAwaitOverBufferedChannel() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ def items = []
+ def ch = Awaitable.channel(4)
+
+ AsyncScope.withScope { scope ->
+ scope.async {
+ await ch.send(1)
+ await ch.send(2)
+ await ch.send(3)
+ ch.close()
+ null
+ }
+
+ for await (item in ch) {
+ items << item
+ }
+ }
+
+ assert items == [1, 2, 3]
+ '''
+ }
+
+ @Test
+ void testForAwaitOverUnbufferedChannel() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ def items = []
+ def ch = Awaitable.channel()
+
+ AsyncScope.withScope { scope ->
+ scope.async {
+ for (int i = 0; i < 5; i++) {
+ await ch.send(i)
+ }
+ ch.close()
+ null
+ }
+
+ for await (item in ch) {
+ items << item
+ }
+ }
+
+ assert items == [0, 1, 2, 3, 4]
+ '''
+ }
+
+ @Test
+ void testForAwaitExitsOnChannelClose() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ def ch = Awaitable.channel(2)
+ await ch.send('a')
+ await ch.send('b')
+ ch.close()
+
+ def items = []
+ for await (item in ch) {
+ items << item
+ }
+ assert items == ['a', 'b']
+ '''
+ }
+
+ @Test
+ void testForAwaitBreakDoesNotCloseChannel() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ def ch = Awaitable.channel(10)
+ for (int i = 0; i < 5; i++) { await ch.send(i) }
+
+ def firstTwo = []
+ for await (item in ch) {
+ firstTwo << item
+ if (firstTwo.size() == 2) break
+ }
+
+ // Channel should still be open — asStream().close() is a no-op
+ assert !ch.closed
+ assert firstTwo == [0, 1]
+
+ // Remaining items are still receivable
+ assert await(ch.receive()) == 2
+ '''
+ }
+
+ @Test
+ void testForAwaitOverEmptyClosedChannel() {
+ assertScript '''
+ import groovy.concurrent.Channel
+
+ def ch = new Channel<String>(1)
+ ch.close()
+
+ def items = []
+ for await (item in ch) {
+ items << item
+ }
+ assert items.isEmpty()
+ '''
+ }
+
+ @Test
+ void testForAwaitConcurrentFanInPipeline() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ int producerCount = 5
+ int itemsPerProducer = 10
+ def ch = Awaitable.channel(16)
+
+ def received = []
+ AsyncScope.withScope { scope ->
+ def producerTasks = (0..<producerCount).collect { p ->
+ scope.async {
+ for (int i = 0; i < itemsPerProducer; i++) {
+ await ch.send("p${p}-${i}".toString())
+ }
+ null
+ }
+ }
+
+ // Close channel after all producers complete (deterministic,
no timing dependency)
+ scope.async {
+ await Awaitable.all(producerTasks as Object[])
+ ch.close()
+ null
+ }
+
+ for await (item in ch) {
+ received << item
+ }
+ }
+
+ assert received.size() == producerCount * itemsPerProducer
+ assert received.toSet().size() == producerCount * itemsPerProducer
+ '''
+ }
+
+ @Test
+ void testForAwaitPreservesAsyncContext() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ AsyncContext.current()['traceId'] = 'for-await-trace'
+ def ch = Awaitable.channel(3)
+
+ AsyncScope.withScope { scope ->
+ scope.async {
+ await ch.send('a')
+ await ch.send('b')
+ ch.close()
+ null
+ }
+
+ for await (item in ch) {
+ assert AsyncContext.current()['traceId'] ==
'for-await-trace'
+ }
+ }
+
+ AsyncContext.current().clear()
+ '''
+ }
+
+ @Test
+ void testChannelAsStreamExplicit() {
+ assertScript '''
+ import groovy.concurrent.Channel
+
+ def ch = new Channel<Integer>(3)
+ await ch.send(10)
+ await ch.send(20)
+ await ch.send(30)
+ ch.close()
+
+ def stream = ch.asStream()
+ def items = []
+ while (await stream.moveNext()) {
+ items << stream.current
+ }
+ assert items == [10, 20, 30]
+ '''
+ }
+}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncRuntimeEnhancementTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncRuntimeEnhancementTest.groovy
new file mode 100644
index 0000000000..552eb60d61
--- /dev/null
+++
b/src/test/groovy/org/codehaus/groovy/transform/AsyncRuntimeEnhancementTest.groovy
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform
+
+import org.junit.jupiter.api.Test
+
+import static groovy.test.GroovyAssert.assertScript
+
+/**
+ * Integration coverage for the upgraded async runtime through real Groovy
+ * syntax and metaprogramming features.
+ */
+final class AsyncRuntimeEnhancementTest {
+
+ @Test
+ void testDeepAsyncExceptionChainsRetainCatchSemantics() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+ import static groovy.test.GroovyAssert.shouldFail
+
+ class Service {
+ async leaf() {
+ await Awaitable.delay(10)
+ throw new IOException('leaf exploded')
+ }
+
+ async middle() {
+ await leaf()
+ }
+
+ async top() {
+ try {
+ await middle()
+ } catch (IOException ioe) {
+ throw new
IllegalStateException("trace=${AsyncContext.current()['traceId']}", ioe)
+ }
+ }
+ }
+
+ AsyncContext.with([traceId: 'req-42']) {
+ def failure = shouldFail(IllegalStateException) {
+ await new Service().top()
+ }
+
+ assert failure.message == 'trace=req-42'
+ assert failure.cause instanceof IOException
+ assert failure.cause.message == 'leaf exploded'
+ }
+ '''
+ }
+
+ @Test
+ void testDeferWaitsForReturnedAsyncCleanup() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ class Resource {
+ final List<String> log = []
+
+ Awaitable<Void> closeAsync(String name) {
+ Awaitable.go {
+ await Awaitable.delay(15)
+ log << "close:${name}"
+ null
+ }
+ }
+ }
+
+ class Service {
+ final Resource resource = new Resource()
+
+ async work() {
+ defer { resource.closeAsync('outer') }
+ defer { resource.closeAsync('inner') }
+ resource.log << 'body'
+ return resource.log
+ }
+ }
+
+ def service = new Service()
+ assert await(service.work()) == ['body', 'close:inner',
'close:outer']
+ '''
+ }
+
+ @Test
+ void testGoChannelsAndMetaprogrammingRemainGroovyFriendly() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ String.metaClass.shout = { -> delegate.toUpperCase() + '!' }
+
+ try {
+ def result = AsyncScope.withScope { scope ->
+ def channel = Awaitable.channel(1)
+
+ def producer = Awaitable.go {
+ await channel.send('groovy'.shout())
+ 'produced'
+ }
+
+ def consumer = scope.async {
+ def prefix = 'hello'
+ def formatter = { value -> "${prefix} ${value}" }
+ formatter(await channel.receive())
+ }
+
+ def selected = await Awaitable.any(
+ consumer,
+ Awaitable.delay(500).then { 'timeout' }
+ )
+
+ [message: selected, producer: await(producer), childCount:
scope.childCount]
+ }
+
+ assert result.message == 'hello GROOVY!'
+ assert result.producer == 'produced'
+ assert result.childCount == 2
+ } finally {
+ GroovySystem.metaClassRegistry.removeMetaClass(String)
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncContextPropagatesToAsyncMethodsAndGenerators() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ class ContextAwareService {
+ async fetchWithContext() {
+ assert AsyncContext.current()['traceId'] == 'gen-trace'
+ await Awaitable.delay(10)
+ "value-${AsyncContext.current()['traceId']}"
+ }
+
+ async generateItems() {
+ for (int i = 0; i < 3; i++) {
+ assert AsyncContext.current()['traceId'] == 'gen-trace'
+ yield return
"item-$i-${AsyncContext.current()['traceId']}"
+ }
+ }
+ }
+
+ AsyncContext.with([traceId: 'gen-trace']) {
+ def svc = new ContextAwareService()
+ assert await(svc.fetchWithContext()) == 'value-gen-trace'
+
+ def items = []
+ for await (item in svc.generateItems()) {
+ items << item
+ }
+ assert items == ['item-0-gen-trace', 'item-1-gen-trace',
'item-2-gen-trace']
+ }
+ '''
+ }
+
+ @Test
+ void testDeferPreservesAsyncContextAtRegistrationTime() {
+ assertScript '''
+ import groovy.concurrent.AsyncContext
+ import groovy.concurrent.Awaitable
+
+ class DeferContextService {
+ final List<String> log = []
+
+ async work() {
+ AsyncContext.current()['phase'] = 'init'
+ defer {
+ log <<
"cleanup-phase:${AsyncContext.current()['phase']}".toString()
+ null
+ }
+ AsyncContext.current()['phase'] = 'body'
+ log <<
"body-phase:${AsyncContext.current()['phase']}".toString()
+ return log
+ }
+ }
+
+ def svc = new DeferContextService()
+ def result = await svc.work()
+ assert result.contains('body-phase:body')
+ assert result.contains('cleanup-phase:init')
+ '''
+ }
+
+ @Test
+ void testHighConcurrencyChannelPipelineWithAsyncMethods() {
+ assertScript '''
+ import groovy.concurrent.AsyncScope
+ import groovy.concurrent.Awaitable
+
+ class Pipeline {
+ static async processPipeline(int itemCount) {
+ def stage1 = Awaitable.channel(16)
+ def stage2 = Awaitable.channel(16)
+ def results = new
java.util.concurrent.CopyOnWriteArrayList()
+
+ AsyncScope.withScope { scope ->
+ // Producer
+ scope.async {
+ for (int i = 0; i < itemCount; i++) {
+ await stage1.send(i)
+ }
+ null
+ }
+
+ // Transformer (2 workers)
+ 2.times {
+ scope.async {
+ for (int i = 0; i < itemCount / 2; i++) {
+ int value = await stage1.receive()
+ await stage2.send(value * 10)
+ }
+ null
+ }
+ }
+
+ // Consumer
+ scope.async {
+ for (int i = 0; i < itemCount; i++) {
+ results.add(await stage2.receive())
+ }
+ null
+ }
+ }
+
+ results.sort()
+ }
+ }
+
+ def results = await Pipeline.processPipeline(20)
+ assert results.size() == 20
+ assert results == (0..<20).collect { it * 10 }
+ '''
+ }
+}