This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new c5fbe109cf Minor tweaks
c5fbe109cf is described below

commit c5fbe109cf51076b04d095b203d9023944f70dad
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 21 18:01:27 2026 +0900

    Minor tweaks
---
 src/main/java/groovy/concurrent/AsyncContext.java  | 374 ++++++++
 src/main/java/groovy/concurrent/AsyncScope.java    |  66 +-
 src/main/java/groovy/concurrent/AsyncStream.java   |   5 +
 src/main/java/groovy/concurrent/Awaitable.java     |  58 +-
 .../concurrent/AwaitableAdapterRegistry.java       |   4 +
 src/main/java/groovy/concurrent/Channel.java       | 409 +++++++++
 .../groovy/concurrent/ChannelClosedException.java  |  36 +
 .../apache/groovy/runtime/async/AsyncSupport.java  | 162 +++-
 .../apache/groovy/runtime/async/GroovyPromise.java |  29 +-
 .../groovy/transform/AsyncASTTransformation.java   |   8 +-
 .../groovy/transform/AsyncTransformHelper.java     |   5 +-
 src/spec/doc/core-async-await.adoc                 | 224 ++++-
 src/spec/test/AsyncAwaitSpecTest.groovy            | 244 ++++++
 .../transform/AsyncContextChannelTest.groovy       | 976 +++++++++++++++++++++
 .../transform/AsyncRuntimeEnhancementTest.groovy   | 254 ++++++
 15 files changed, 2802 insertions(+), 52 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AsyncContext.java 
b/src/main/java/groovy/concurrent/AsyncContext.java
new file mode 100644
index 0000000000..5f9fae9f66
--- /dev/null
+++ b/src/main/java/groovy/concurrent/AsyncContext.java
@@ -0,0 +1,374 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent;
+
+import groovy.lang.Closure;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Lightweight async-local context map for Groovy async execution.
+ * <p>
+ * {@code AsyncContext} is Groovy's answer to C#'s {@code AsyncLocal},
+ * Python's {@code contextvars}, and Kotlin's coroutine context for the
+ * most common production use-cases: request correlation IDs, tenant IDs,
+ * security principals, trace metadata, and other small pieces of
+ * execution-scoped state that must survive thread hops.
+ * <p>
+ * The current context is stored per thread, but async runtimes capture a
+ * {@linkplain Snapshot snapshot} when a task or continuation is scheduled
+ * and restore that snapshot when it executes.  Child tasks therefore see
+ * the parent's values at spawn time, while subsequent child-side
+ * mutations remain isolated from the parent and siblings.
+ * <p>
+ * Keys are normalized to non-null {@link String} values.  Supplying
+ * {@code null} as a value removes the key.
+ *
+ * @since 6.0.0
+ */
+public final class AsyncContext {
+
+    private static final ThreadLocal<AsyncContext> CURRENT =
+            ThreadLocal.withInitial(AsyncContext::new);
+
+    private final Map<String, Object> values;
+
+    /**
+     * Creates an empty async context.
+     */
+    public AsyncContext() {
+        this.values = new LinkedHashMap<>();
+    }
+
+    /**
+     * Creates a context seeded from the supplied entries.
+     *
+     * @param initialValues initial context entries; may be {@code null}
+     */
+    public AsyncContext(Map<?, ?> initialValues) {
+        this();
+        putAll(initialValues);
+    }
+
+    // Internal constructor that directly adopts a pre-copied map
+    private AsyncContext(LinkedHashMap<String, Object> values, boolean 
ignored) {
+        this.values = values;
+    }
+
+    /**
+     * Returns the live async context associated with the current thread.
+     *
+     * @return the current async context, never {@code null}
+     */
+    public static AsyncContext current() {
+        return CURRENT.get();
+    }
+
+    /**
+     * Captures an immutable snapshot of the current async context.
+     *
+     * @return the captured snapshot
+     */
+    public static Snapshot capture() {
+        return Snapshot.of(current().copyValues());
+    }
+
+    /**
+     * Executes the supplier with the given snapshot installed as the current
+     * async context, restoring the previous context afterwards.
+     *
+     * @param snapshot the snapshot to install
+     * @param supplier the action to execute
+     * @param <T>      the result type
+     * @return the supplier's result
+     */
+    public static <T> T withSnapshot(Snapshot snapshot, Supplier<T> supplier) {
+        Objects.requireNonNull(snapshot, "snapshot must not be null");
+        Objects.requireNonNull(supplier, "supplier must not be null");
+        AsyncContext previous = CURRENT.get();
+        CURRENT.set(new AsyncContext(new LinkedHashMap<>(snapshot.values), 
true));
+        try {
+            return supplier.get();
+        } finally {
+            CURRENT.set(previous);
+        }
+    }
+
+    /**
+     * Executes the runnable with the given snapshot installed as the current
+     * async context, restoring the previous context afterwards.
+     *
+     * @param snapshot the snapshot to install
+     * @param action   the action to execute
+     */
+    public static void withSnapshot(Snapshot snapshot, Runnable action) {
+        withSnapshot(snapshot, () -> {
+            action.run();
+            return null;
+        });
+    }
+
+    /**
+     * Temporarily overlays the current async context with the supplied
+     * entries for the duration of the closure.
+     * <p>
+     * Any modifications performed inside the closure are scoped to that
+     * dynamic extent and are discarded when the closure returns or throws.
+     *
+     * @param entries the entries to overlay; may be {@code null}
+     * @param action  the closure to execute
+     * @param <T>     the result type
+     * @return the closure's result
+     */
+    public static <T> T with(Map<?, ?> entries, Closure<T> action) {
+        Objects.requireNonNull(action, "action must not be null");
+        Snapshot merged = capture().with(entries);
+        return withSnapshot(merged, () -> action.call());
+    }
+
+    /**
+     * Looks up a value in the current context by key.
+     *
+     * @param key the context key
+     * @return the stored value, or {@code null} if absent
+     */
+    public Object get(String key) {
+        return values.get(normalizeKey(key));
+    }
+
+    /**
+     * Groovy operator support for {@code context['traceId']}.
+     *
+     * @param key the context key
+     * @return the stored value, or {@code null} if absent
+     */
+    public Object getAt(String key) {
+        return get(key);
+    }
+
+    /**
+     * Stores a value in the current context.
+     * <p>
+     * A {@code null} value removes the key.
+     *
+     * @param key   the context key
+     * @param value the value to store, or {@code null} to remove the key
+     * @return the previous value associated with the key, or {@code null}
+     */
+    public Object put(String key, Object value) {
+        String normalized = normalizeKey(key);
+        if (value == null) {
+            return values.remove(normalized);
+        }
+        return values.put(normalized, value);
+    }
+
+    /**
+     * Groovy operator support for {@code context['traceId'] = 'abc-123'}.
+     *
+     * @param key   the context key
+     * @param value the value to store, or {@code null} to remove the key
+     */
+    public void putAt(String key, Object value) {
+        put(key, value);
+    }
+
+    /**
+     * Removes a key from the context.
+     *
+     * @param key the context key
+     * @return the removed value, or {@code null} if absent
+     */
+    public Object remove(String key) {
+        return values.remove(normalizeKey(key));
+    }
+
+    /**
+     * Adds all entries from the supplied map to this context.
+     * <p>
+     * A {@code null} map is ignored.  {@code null} values remove keys.
+     *
+     * @param entries the entries to add
+     */
+    public void putAll(Map<?, ?> entries) {
+        if (entries == null || entries.isEmpty()) {
+            return;
+        }
+        for (Map.Entry<?, ?> entry : entries.entrySet()) {
+            put(normalizeKey(entry.getKey()), entry.getValue());
+        }
+    }
+
+    /**
+     * Removes all entries from this context.
+     */
+    public void clear() {
+        values.clear();
+    }
+
+    /**
+     * Returns {@code true} if the given key is present.
+     *
+     * @param key the context key
+     * @return {@code true} if present
+     */
+    public boolean containsKey(String key) {
+        return values.containsKey(normalizeKey(key));
+    }
+
+    /**
+     * Returns the number of entries in this context.
+     *
+     * @return the entry count
+     */
+    public int size() {
+        return values.size();
+    }
+
+    /**
+     * Returns {@code true} if this context is empty.
+     *
+     * @return {@code true} if empty
+     */
+    public boolean isEmpty() {
+        return values.isEmpty();
+    }
+
+    /**
+     * Returns an immutable copy of this context's entries.
+     *
+     * @return an immutable snapshot of the context
+     */
+    public Map<String, Object> snapshot() {
+        return Collections.unmodifiableMap(copyValues());
+    }
+
+    @Override
+    public String toString() {
+        return "AsyncContext" + values;
+    }
+
+    private Map<String, Object> copyValues() {
+        return new LinkedHashMap<>(values);
+    }
+
+    private static String normalizeKey(Object key) {
+        return Objects.requireNonNull(key, "context key must not be 
null").toString();
+    }
+
+    /**
+     * Immutable async-context snapshot captured at task or continuation
+     * registration time.
+     *
+     * @since 6.0.0
+     */
+    public static final class Snapshot {
+
+        private final Map<String, Object> values;
+
+        private Snapshot(Map<String, Object> values) {
+            this.values = values;
+        }
+
+        // Creates a snapshot that directly wraps a pre-copied map
+        static Snapshot of(Map<String, Object> preCopied) {
+            return new Snapshot(Collections.unmodifiableMap(preCopied));
+        }
+
+        /**
+         * Returns {@code true} if this snapshot contains the given key.
+         *
+         * @param key the context key
+         * @return {@code true} if present
+         */
+        public boolean containsKey(String key) {
+            return values.containsKey(normalizeKey(key));
+        }
+
+        /**
+         * Returns the captured value for the given key.
+         *
+         * @param key the context key
+         * @return the captured value, or {@code null} if absent
+         */
+        public Object get(String key) {
+            return values.get(normalizeKey(key));
+        }
+
+        /**
+         * Returns {@code true} if this snapshot contains no entries.
+         *
+         * @return {@code true} if empty
+         */
+        public boolean isEmpty() {
+            return values.isEmpty();
+        }
+
+        /**
+         * Returns the number of entries in this snapshot.
+         *
+         * @return the entry count
+         */
+        public int size() {
+            return values.size();
+        }
+
+        /**
+         * Returns the captured entries as an immutable map.
+         *
+         * @return the captured entries
+         */
+        public Map<String, Object> asMap() {
+            return values;
+        }
+
+        /**
+         * Creates a new snapshot by overlaying the supplied entries on top of
+         * this snapshot.
+         *
+         * @param entries the entries to merge; may be {@code null}
+         * @return the merged snapshot
+         */
+        public Snapshot with(Map<?, ?> entries) {
+            if (entries == null || entries.isEmpty()) {
+                return this;
+            }
+            Map<String, Object> merged = new LinkedHashMap<>(values);
+            for (Map.Entry<?, ?> entry : entries.entrySet()) {
+                String key = normalizeKey(entry.getKey());
+                Object value = entry.getValue();
+                if (value == null) {
+                    merged.remove(key);
+                } else {
+                    merged.put(key, value);
+                }
+            }
+            return Snapshot.of(merged);
+        }
+
+        @Override
+        public String toString() {
+            return "AsyncContext.Snapshot" + values;
+        }
+    }
+}
diff --git a/src/main/java/groovy/concurrent/AsyncScope.java 
b/src/main/java/groovy/concurrent/AsyncScope.java
index 3e9ba837ec..f0c22a58e7 100644
--- a/src/main/java/groovy/concurrent/AsyncScope.java
+++ b/src/main/java/groovy/concurrent/AsyncScope.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 /**
  * Structured concurrency scope that ensures all child tasks complete
@@ -80,6 +81,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class AsyncScope implements AutoCloseable {
 
+    private static final ThreadLocal<AsyncScope> CURRENT = new ThreadLocal<>();
+
     private final List<CompletableFuture<?>> children = new 
CopyOnWriteArrayList<>();
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Executor executor;
@@ -113,10 +116,64 @@ public class AsyncScope implements AutoCloseable {
         this(AsyncSupport.getExecutor(), true);
     }
 
+    /**
+     * Returns the structured async scope currently bound to this thread, or
+     * {@code null} if execution is not inside {@link #withScope(Closure)} or
+     * a child launched from such a scope.
+     *
+     * @return the current scope, or {@code null}
+     */
+    public static AsyncScope current() {
+        return CURRENT.get();
+    }
+
+    /**
+     * Executes the supplier with the given scope installed as the current
+     * structured scope, restoring the previous binding afterwards.
+     *
+     * @param scope    the scope to install; may be {@code null}
+     * @param supplier the action to execute
+     * @param <T>      the result type
+     * @return the supplier's result
+     */
+    public static <T> T withCurrent(AsyncScope scope, Supplier<T> supplier) {
+        Objects.requireNonNull(supplier, "supplier must not be null");
+        AsyncScope previous = CURRENT.get();
+        if (scope == null) {
+            CURRENT.remove();
+        } else {
+            CURRENT.set(scope);
+        }
+        try {
+            return supplier.get();
+        } finally {
+            if (previous == null) {
+                CURRENT.remove();
+            } else {
+                CURRENT.set(previous);
+            }
+        }
+    }
+
+    /**
+     * Void overload of {@link #withCurrent(AsyncScope, Supplier)}.
+     *
+     * @param scope  the scope to install; may be {@code null}
+     * @param action the action to execute
+     */
+    public static void withCurrent(AsyncScope scope, Runnable action) {
+        withCurrent(scope, () -> {
+            action.run();
+            return null;
+        });
+    }
+
     /**
      * Launches a child task within this scope.  The task's lifetime is
      * bound to the scope: when the scope is closed, all incomplete child
-     * tasks are cancelled.
+     * tasks are cancelled.  The child inherits a snapshot of the current
+     * {@link AsyncContext}, but any child-side context mutations remain
+     * isolated from the parent and siblings.
      *
      * @param body the async body to execute
      * @param <T>  the result type
@@ -125,12 +182,14 @@ public class AsyncScope implements AutoCloseable {
      */
     @SuppressWarnings("unchecked")
     public <T> Awaitable<T> async(Closure<T> body) {
+        Objects.requireNonNull(body, "body must not be null");
         if (closed.get()) {
             throw new IllegalStateException("AsyncScope is closed — cannot 
launch new tasks");
         }
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
         CompletableFuture<T> cf = CompletableFuture.supplyAsync(() -> {
             try {
-                return body.call();
+                return withCurrent(this, () -> 
AsyncContext.withSnapshot(contextSnapshot, () -> body.call()));
             } catch (CompletionException ce) {
                 throw ce;
             } catch (Throwable t) {
@@ -251,8 +310,9 @@ public class AsyncScope implements AutoCloseable {
      */
     @SuppressWarnings("unchecked")
     public static <T> T withScope(Executor executor, Closure<T> body) {
+        Objects.requireNonNull(body, "body must not be null");
         try (AsyncScope scope = new AsyncScope(executor)) {
-            return body.call(scope);
+            return withCurrent(scope, () -> body.call(scope));
         }
     }
 }
diff --git a/src/main/java/groovy/concurrent/AsyncStream.java 
b/src/main/java/groovy/concurrent/AsyncStream.java
index 318ddf6355..d2ac2603ad 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -35,6 +35,11 @@ package groovy.concurrent;
  *       to create a generator-style stream</li>
  *   <li>Adapting JDK {@link java.util.concurrent.Flow.Publisher} instances
  *       (supported out of the box by the built-in adapter)</li>
+ *   <li>Viewing a {@link Channel} as a stream via {@link Channel#asStream()} —
+ *       each {@code moveNext()} call maps to a {@code receive()}, and
+ *       {@link ChannelClosedException} is translated to end-of-stream.
+ *       This conversion is registered automatically, so {@code for await 
(item in channel)}
+ *       works out of the box.</li>
  *   <li>Adapting third-party reactive types (Reactor {@code Flux}, RxJava
  *       {@code Observable}) via {@link AwaitableAdapter}</li>
  * </ul>
diff --git a/src/main/java/groovy/concurrent/Awaitable.java 
b/src/main/java/groovy/concurrent/Awaitable.java
index d7bf8fb689..0da8a093da 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -18,6 +18,7 @@
  */
 package groovy.concurrent;
 
+import groovy.lang.Closure;
 import org.apache.groovy.runtime.async.AsyncSupport;
 import org.apache.groovy.runtime.async.GroovyPromise;
 
@@ -70,6 +71,8 @@ import java.util.function.Function;
  *       {@code Task.FromResult()} / {@code Promise.resolve()}</li>
  *   <li>{@link #failed(Throwable) Awaitable.failed(error)} — like
  *       {@code Task.FromException()} / {@code Promise.reject()}</li>
+ *   <li>{@link #go(Closure) Awaitable.go { ... }} — lightweight Go-style task 
spawn</li>
+ *   <li>{@link #channel()} / {@link #channel(int)} — Go-style buffered or 
unbuffered channels</li>
  * </ul>
  * <p>
  * <b>Instance continuations</b> provide ergonomic composition without exposing
@@ -183,7 +186,12 @@ public interface Awaitable<T> {
      * @since 6.0.0
      */
     default Awaitable<Void> thenAccept(Consumer<? super T> action) {
-        return GroovyPromise.of(toCompletableFuture().thenAccept(action));
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        return GroovyPromise.of(toCompletableFuture().thenAccept(value ->
+                AsyncContext.withSnapshot(contextSnapshot, () -> {
+                    action.accept(value);
+                    return null;
+                })));
     }
 
     /**
@@ -212,8 +220,12 @@ public interface Awaitable<T> {
      * @since 6.0.0
      */
     default Awaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> 
action) {
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
         return GroovyPromise.of(toCompletableFuture().whenComplete((value, 
error) ->
-                action.accept(value, error == null ? null : 
AsyncSupport.deepUnwrap(error))));
+                AsyncContext.withSnapshot(contextSnapshot, () -> {
+                    action.accept(value, error == null ? null : 
AsyncSupport.deepUnwrap(error));
+                    return null;
+                })));
     }
 
     /**
@@ -231,8 +243,10 @@ public interface Awaitable<T> {
      * @since 6.0.0
      */
     default <U> Awaitable<U> handle(BiFunction<? super T, Throwable, ? extends 
U> fn) {
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
         return GroovyPromise.of(toCompletableFuture().handle((value, error) ->
-                fn.apply(value, error == null ? null : 
AsyncSupport.deepUnwrap(error))));
+                AsyncContext.withSnapshot(contextSnapshot,
+                        () -> fn.apply(value, error == null ? null : 
AsyncSupport.deepUnwrap(error)))));
     }
 
     /**
@@ -352,6 +366,44 @@ public interface Awaitable<T> {
         return new GroovyPromise<>(CompletableFuture.failedFuture(error));
     }
 
+    /**
+     * Spawns a lightweight Go-style task.
+     * <p>
+     * Inside an {@link AsyncScope}, the new task is attached to the current
+     * structured scope. Outside a scope it behaves like a detached async task.
+     *
+     * @param closure the task body
+     * @param <T>     the result type
+     * @return an awaitable representing the spawned task
+     * @since 6.0.0
+     */
+    static <T> Awaitable<T> go(Closure<T> closure) {
+        return AsyncSupport.go(closure);
+    }
+
+    /**
+     * Creates an unbuffered Go-style channel.
+     *
+     * @param <T> the payload type
+     * @return a new unbuffered channel
+     * @since 6.0.0
+     */
+    static <T> Channel<T> channel() {
+        return AsyncSupport.channel();
+    }
+
+    /**
+     * Creates a Go-style channel with the given buffer capacity.
+     *
+     * @param capacity the buffer capacity
+     * @param <T>      the payload type
+     * @return a new channel
+     * @since 6.0.0
+     */
+    static <T> Channel<T> channel(int capacity) {
+        return AsyncSupport.channel(capacity);
+    }
+
     // ---- Combinators (like C#'s Task.WhenAll/WhenAny, JS's Promise.all/any) 
----
 
     /**
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java 
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 33765f1c04..e3921763d9 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -181,6 +181,9 @@ public class AwaitableAdapterRegistry {
     /**
      * Converts the given source to an {@link AsyncStream}.
      * If the source is already an {@code AsyncStream}, it is returned as-is.
+     * {@link Channel} instances are handled as a built-in special case via
+     * {@link Channel#asStream()}, enabling transparent {@code for await}
+     * iteration over channels.
      * <p>
      * Uses a per-class {@link ClassValue} cache to avoid repeated linear
      * scans of the adapter list on the hot path.
@@ -202,6 +205,7 @@ public class AwaitableAdapterRegistry {
             throw new IllegalArgumentException("Cannot convert null to 
AsyncStream");
         }
         if (source instanceof AsyncStream) return (AsyncStream<T>) source;
+        if (source instanceof Channel) return ((Channel<T>) source).asStream();
         Class<?> type = source.getClass();
         AwaitableAdapter adapter = streamCache.get(type);
         if (adapter != null) {
diff --git a/src/main/java/groovy/concurrent/Channel.java 
b/src/main/java/groovy/concurrent/Channel.java
new file mode 100644
index 0000000000..44c94a9c4a
--- /dev/null
+++ b/src/main/java/groovy/concurrent/Channel.java
@@ -0,0 +1,409 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent;
+
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Go-inspired asynchronous channel with optional buffering.
+ * <p>
+ * A channel coordinates producers and consumers without exposing explicit
+ * locks.  With a capacity of {@code 0}, sends rendezvous directly with
+ * receivers (unbuffered semantics).  With a positive capacity, values are
+ * buffered until the channel fills, after which senders wait until a
+ * receiver makes room.
+ * <p>
+ * Channel operations return {@link Awaitable} values so they compose
+ * naturally with Groovy's {@code await} and {@link Awaitable#any(Object...)}
+ * APIs.
+ * <p>
+ * Channels also integrate with Groovy's {@code for await} syntax via
+ * the {@link #asStream()} method, which provides a read-only
+ * {@link AsyncStream} view that yields received values until the channel
+ * is closed:
+ * <pre>
+ * for await (item in channel) {
+ *     process(item)
+ * }
+ * </pre>
+ * <p>
+ * {@code null} payloads are rejected to keep a clear distinction between
+ * "no value yet" and an actual payload.  If the channel is closed, pending
+ * senders fail immediately and future receivers drain any buffered items
+ * before failing with {@link ChannelClosedException}.
+ *
+ * @param <T> the payload type
+ * @since 6.0.0
+ */
+public final class Channel<T> {
+
+    private final ReentrantLock lock = new ReentrantLock(true);
+    private final Deque<T> buffer = new ArrayDeque<>();
+    private final Deque<PendingSender<T>> waitingSenders = new ArrayDeque<>();
+    private final Deque<CompletableFuture<T>> waitingReceivers = new 
ArrayDeque<>();
+    private final int capacity;
+
+    private boolean closed;
+
+    /**
+     * Creates an unbuffered channel.
+     */
+    public Channel() {
+        this(0);
+    }
+
+    /**
+     * Creates a channel with the given buffer capacity.
+     *
+     * @param capacity the buffer capacity; must not be negative
+     */
+    public Channel(int capacity) {
+        if (capacity < 0) {
+            throw new IllegalArgumentException("channel capacity must not be 
negative: " + capacity);
+        }
+        this.capacity = capacity;
+    }
+
+    /**
+     * Returns this channel's configured buffer capacity.
+     *
+     * @return the capacity
+     */
+    public int getCapacity() {
+        return capacity;
+    }
+
+    /**
+     * Returns the number of currently buffered elements.
+     *
+     * @return the buffered element count
+     */
+    public int getBufferedSize() {
+        lock.lock();
+        try {
+            return buffer.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns {@code true} if the channel has been closed.
+     *
+     * @return {@code true} if closed
+     */
+    public boolean isClosed() {
+        lock.lock();
+        try {
+            return closed;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Sends a value through the channel.
+     * <p>
+     * The returned {@link Awaitable} completes when the value has either been
+     * handed off to a receiver or enqueued in the channel buffer.
+     *
+     * @param value the value to send; must not be {@code null}
+     * @return an awaitable that completes when the send succeeds
+     */
+    public Awaitable<Void> send(T value) {
+        Objects.requireNonNull(value, "channel does not support null values");
+        CompletableFuture<Void> completion = new CompletableFuture<>();
+        PendingSender<T> pendingSender = new PendingSender<>(value, 
completion);
+
+        lock.lock();
+        try {
+            if (closed) {
+                completion.completeExceptionally(closedForSend());
+            } else if (deliverToWaitingReceiver(value)) {
+                completion.complete(null);
+            } else if (buffer.size() < capacity) {
+                buffer.addLast(value);
+                completion.complete(null);
+            } else {
+                waitingSenders.addLast(pendingSender);
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        if (!completion.isDone()) {
+            completion.whenComplete((ignored, error) -> {
+                if (error != null || completion.isCancelled()) {
+                    removePendingSender(pendingSender);
+                }
+            });
+        }
+
+        return GroovyPromise.of(completion);
+    }
+
+    /**
+     * Receives the next value from the channel.
+     * <p>
+     * If the channel is buffered, buffered values are consumed first.  If the
+     * channel is closed and drained, the returned awaitable fails with
+     * {@link ChannelClosedException}.
+     *
+     * @return an awaitable that yields the next value
+     */
+    public Awaitable<T> receive() {
+        CompletableFuture<T> completion = new CompletableFuture<>();
+
+        lock.lock();
+        try {
+            T buffered = pollBuffer();
+            if (buffered != null) {
+                completion.complete(buffered);
+            } else {
+                PendingSender<T> sender = pollPendingSender();
+                if (sender != null) {
+                    sender.completion.complete(null);
+                    completion.complete(sender.value);
+                } else if (closed) {
+                    completion.completeExceptionally(closedForReceive());
+                } else {
+                    waitingReceivers.addLast(completion);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        if (!completion.isDone()) {
+            completion.whenComplete((ignored, error) -> {
+                if (error != null || completion.isCancelled()) {
+                    removePendingReceiver(completion);
+                }
+            });
+        }
+
+        return GroovyPromise.of(completion);
+    }
+
+    /**
+     * Closes the channel.
+     * <p>
+     * Closing is idempotent.  Buffered values remain receivable.  Any pending
+     * senders fail immediately, and receivers waiting after the buffer is
+     * drained fail with {@link ChannelClosedException}.
+     *
+     * @return {@code true} if this invocation closed the channel,
+     *         {@code false} if it had already been closed
+     */
+    public boolean close() {
+        lock.lock();
+        try {
+            if (closed) {
+                return false;
+            }
+            closed = true;
+
+            drainBufferToReceivers();
+
+            while (!waitingReceivers.isEmpty()) {
+                
waitingReceivers.removeFirst().completeExceptionally(closedForReceive());
+            }
+
+            while (!waitingSenders.isEmpty()) {
+                
waitingSenders.removeFirst().completion.completeExceptionally(closedForSend());
+            }
+
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns a read-only {@link AsyncStream} view of this channel.
+     * <p>
+     * The returned stream yields values received from the channel via
+     * {@link #receive()}.  When the channel is closed and all buffered
+     * values are drained, the stream signals completion
+     * ({@code moveNext()} returns {@code false}).
+     * <p>
+     * This enables natural {@code for await} iteration:
+     * <pre>
+     * for await (item in channel) {
+     *     process(item)
+     * }
+     * </pre>
+     * <p>
+     * <b>Ownership:</b> the stream view does not own the channel — calling
+     * {@code close()} on the stream is a no-op.  The producer is responsible
+     * for closing the channel when done sending.  This matches Go's semantics
+     * where the receiver does not close the channel.
+     *
+     * @return an async stream view of this channel
+     */
+    public AsyncStream<T> asStream() {
+        return new ChannelAsyncStream<>(this);
+    }
+
+    @Override
+    public String toString() {
+        lock.lock();
+        try {
+            return "Channel{capacity=" + capacity
+                    + ", buffered=" + buffer.size()
+                    + ", waitingSenders=" + waitingSenders.size()
+                    + ", waitingReceivers=" + waitingReceivers.size()
+                    + ", closed=" + closed
+                    + '}';
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private boolean deliverToWaitingReceiver(T value) {
+        while (!waitingReceivers.isEmpty()) {
+            CompletableFuture<T> receiver = waitingReceivers.removeFirst();
+            if (receiver.complete(value)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void drainBufferToReceivers() {
+        while (!waitingReceivers.isEmpty() && !buffer.isEmpty()) {
+            CompletableFuture<T> receiver = waitingReceivers.removeFirst();
+            if (receiver.complete(buffer.peekFirst())) {
+                buffer.removeFirst();
+            }
+        }
+    }
+
+    private T pollBuffer() {
+        if (buffer.isEmpty()) {
+            return null;
+        }
+        T value = buffer.removeFirst();
+        refillBufferFromWaitingSenders();
+        return value;
+    }
+
+    private void refillBufferFromWaitingSenders() {
+        while (buffer.size() < capacity) {
+            PendingSender<T> sender = pollPendingSender();
+            if (sender == null) {
+                return;
+            }
+            buffer.addLast(sender.value);
+            sender.completion.complete(null);
+        }
+    }
+
+    private PendingSender<T> pollPendingSender() {
+        while (!waitingSenders.isEmpty()) {
+            PendingSender<T> sender = waitingSenders.removeFirst();
+            if (!sender.completion.isDone()) {
+                return sender;
+            }
+        }
+        return null;
+    }
+
+    private void removePendingSender(PendingSender<T> sender) {
+        lock.lock();
+        try {
+            waitingSenders.remove(sender);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void removePendingReceiver(CompletableFuture<T> receiver) {
+        lock.lock();
+        try {
+            waitingReceivers.remove(receiver);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private static ChannelClosedException closedForSend() {
+        return new ChannelClosedException("channel is closed for send");
+    }
+
+    private static ChannelClosedException closedForReceive() {
+        return new ChannelClosedException("channel is closed");
+    }
+
+    private static final class PendingSender<T> {
+        private final T value;
+        private final CompletableFuture<Void> completion;
+
+        private PendingSender(T value, CompletableFuture<Void> completion) {
+            this.value = value;
+            this.completion = completion;
+        }
+    }
+
+    /**
+     * Read-only {@link AsyncStream} view over a channel.
+     * <p>
+     * Each {@link #moveNext()} call issues a {@link Channel#receive()};
+     * {@link ChannelClosedException} is translated to end-of-stream
+     * ({@code false}).  The stream does not own the channel, so its
+     * {@link #close()} is a no-op.
+     */
+    private static final class ChannelAsyncStream<T> implements AsyncStream<T> 
{
+        private final Channel<T> channel;
+        private volatile T current;
+
+        ChannelAsyncStream(Channel<T> channel) {
+            this.channel = channel;
+        }
+
+        @Override
+        public Awaitable<Boolean> moveNext() {
+            return channel.receive().handle((value, error) -> {
+                if (error != null) {
+                    if (error instanceof ChannelClosedException) {
+                        return false;
+                    }
+                    if (error instanceof RuntimeException re) throw re;
+                    if (error instanceof Error e) throw e;
+                    throw new RuntimeException(error);
+                }
+                current = value;
+                return true;
+            });
+        }
+
+        @Override
+        public T getCurrent() {
+            return current;
+        }
+
+        // No-op: the stream view does not own the channel
+    }
+}
diff --git a/src/main/java/groovy/concurrent/ChannelClosedException.java 
b/src/main/java/groovy/concurrent/ChannelClosedException.java
new file mode 100644
index 0000000000..e71fd02138
--- /dev/null
+++ b/src/main/java/groovy/concurrent/ChannelClosedException.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent;
+
+/**
+ * Indicates that a {@link Channel} operation was attempted after the channel
+ * had been closed.
+ *
+ * @since 6.0.0
+ */
+public class ChannelClosedException extends IllegalStateException {
+
+    public ChannelClosedException(String message) {
+        super(message);
+    }
+
+    public ChannelClosedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 7996c110e3..73b5a9e15f 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -18,9 +18,12 @@
  */
 package org.apache.groovy.runtime.async;
 
+import groovy.concurrent.AsyncContext;
+import groovy.concurrent.AsyncScope;
 import groovy.concurrent.AsyncStream;
 import groovy.concurrent.AwaitResult;
 import groovy.concurrent.Awaitable;
+import groovy.concurrent.Channel;
 import groovy.lang.Closure;
 
 import java.lang.invoke.MethodHandle;
@@ -34,6 +37,7 @@ import java.util.Arrays;
 import java.util.Deque;
 import java.util.List;
 import java.util.Locale;
+import java.util.Objects;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -62,7 +66,8 @@ import java.util.concurrent.TimeoutException;
  * Key responsibilities:
  * <ul>
  *   <li><b>Async execution</b> — {@code executeAsync()}, {@code 
executeAsyncVoid()},
- *       and {@code wrapAsync()} run closures on the configured executor</li>
+ *       {@code wrapAsync()}, and {@code go()} run closures on the configured
+ *       executor while preserving the current {@link AsyncContext}</li>
  *   <li><b>Await</b> — all {@code await()} overloads use
  *       {@link groovy.concurrent.Awaitable#from(Object) Awaitable.from()} so
  *       that third-party async types
@@ -77,7 +82,9 @@ import java.util.concurrent.TimeoutException;
  *       cancellation upstream</li>
  *   <li><b>Defer</b> — {@code createDeferScope()}, {@code defer()}, and
  *       {@code executeDeferScope()} implement Go-style deferred cleanup with
- *       LIFO execution and exception suppression</li>
+ *       LIFO execution, async cleanup awareness, and exception 
suppression</li>
+ *   <li><b>Channels</b> — {@code channel()} creates Go-inspired channels that
+ *       compose with {@link Awaitable#any(Object...)} / {@code select()}</li>
  *   <li><b>Delay</b> — {@code delay()} provides non-blocking delays using a
  *       shared {@link java.util.concurrent.ScheduledExecutorService}</li>
  *   <li><b>Timeouts</b> — {@code orTimeout()} and {@code completeOnTimeout()} 
apply
@@ -314,6 +321,9 @@ public class AsyncSupport {
      * Executes the given closure asynchronously on the specified executor,
      * returning an {@link Awaitable} that completes with the closure's return
      * value.  Used by {@link groovy.transform.Async @Async} methods.
+     * <p>
+     * Captures the current {@link AsyncContext} at submission time and
+     * restores that snapshot when the closure runs.
      *
      * @param closure  the closure to execute
      * @param executor the executor on which to run the closure
@@ -322,32 +332,43 @@ public class AsyncSupport {
      */
     @SuppressWarnings("unchecked")
     public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor 
executor) {
-        return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
-            try {
-                return closure.call();
-            } catch (Throwable t) {
-                throw wrapForFuture(t);
-            }
-        }, executor));
+        Objects.requireNonNull(closure, "closure must not be null");
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        Executor targetExecutor = executor != null ? executor : 
defaultExecutor;
+        return GroovyPromise.of(CompletableFuture.supplyAsync(() ->
+                AsyncContext.withSnapshot(contextSnapshot, () -> {
+                    try {
+                        return closure.call();
+                    } catch (Throwable t) {
+                        throw wrapForFuture(t);
+                    }
+                }), targetExecutor));
     }
 
     /**
      * Void variant of {@link #executeAsync(Closure, Executor)} for
      * {@link groovy.transform.Async @Async} methods whose return type is
      * {@code void}.
+     * <p>
+     * Captures and restores the current {@link AsyncContext} in the same way
+     * as {@link #executeAsync(Closure, Executor)}.
      *
      * @param closure  the closure to execute
      * @param executor the executor on which to run the closure
      * @return an awaitable that completes when the closure finishes
      */
     public static Awaitable<Void> executeAsyncVoid(Closure<?> closure, 
Executor executor) {
-        return GroovyPromise.of(CompletableFuture.runAsync(() -> {
-            try {
-                closure.call();
-            } catch (Throwable t) {
-                throw wrapForFuture(t);
-            }
-        }, executor));
+        Objects.requireNonNull(closure, "closure must not be null");
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        Executor targetExecutor = executor != null ? executor : 
defaultExecutor;
+        return GroovyPromise.of(CompletableFuture.runAsync(() ->
+                AsyncContext.withSnapshot(contextSnapshot, () -> {
+                    try {
+                        closure.call();
+                    } catch (Throwable t) {
+                        throw wrapForFuture(t);
+                    }
+                }), targetExecutor));
     }
 
     /**
@@ -366,11 +387,32 @@ public class AsyncSupport {
         return executeAsync(closure, defaultExecutor);
     }
 
+    /**
+     * Go-style task spawn primitive.
+     * <p>
+     * When called inside {@link AsyncScope#withScope(groovy.lang.Closure)},
+     * the new task is registered as a structured child of the current scope.
+     * Outside a scope it behaves like a lightweight detached async task.
+     *
+     * @param closure the task body
+     * @param <T>     the result type
+     * @return an awaitable representing the spawned task
+     */
+    public static <T> Awaitable<T> go(Closure<T> closure) {
+        AsyncScope currentScope = AsyncScope.current();
+        if (currentScope != null) {
+            return currentScope.async(closure);
+        }
+        return executeAsync(closure, defaultExecutor);
+    }
+
     /**
      * Wraps a closure so that each invocation executes the body asynchronously
      * and returns an {@link Awaitable}.  This is the runtime entry point for
      * the {@code async { ... }} expression syntax.  The returned closure must
-     * be explicitly called to start the async computation:
+     * be explicitly called to start the async computation.  Each invocation
+     * captures the caller's {@link AsyncContext} and restores it when the
+     * wrapped closure executes:
      * <pre>
      * def task = async { expensiveWork() }
      * def result = await(task())  // explicit call required
@@ -382,16 +424,19 @@ public class AsyncSupport {
      */
     @SuppressWarnings("unchecked")
     public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) {
+        Objects.requireNonNull(closure, "closure must not be null");
         return new Closure<Awaitable<T>>(closure.getOwner(), 
closure.getThisObject()) {
             @SuppressWarnings("unused")
             public Awaitable<T> doCall(Object... args) {
-                return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
-                    try {
-                        return closure.call(args);
-                    } catch (Throwable t) {
-                        throw wrapForFuture(t);
-                    }
-                }, defaultExecutor));
+                AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+                return GroovyPromise.of(CompletableFuture.supplyAsync(() ->
+                        AsyncContext.withSnapshot(contextSnapshot, () -> {
+                            try {
+                                return closure.call(args);
+                            } catch (Throwable t) {
+                                throw wrapForFuture(t);
+                            }
+                        }), defaultExecutor));
             }
         };
     }
@@ -401,7 +446,9 @@ public class AsyncSupport {
      * so that each invocation returns an {@link AsyncStream} producing the
      * yielded elements.  This is the runtime entry point for all async
      * generator closures ({@code async { yield return ... }}).  The returned
-     * closure must be explicitly called to start the generation:
+     * closure must be explicitly called to start the generation.  As with
+     * ordinary async closures, each invocation captures the caller's
+     * {@link AsyncContext} snapshot:
      * <pre>
      * def gen = async { yield return 1; yield return 2 }
      * for await (item in gen()) { println item }
@@ -413,11 +460,13 @@ public class AsyncSupport {
      */
     @SuppressWarnings("unchecked")
     public static <T> Closure<AsyncStream<T>> wrapAsyncGenerator(Closure<?> 
closure) {
+        Objects.requireNonNull(closure, "closure must not be null");
         return new Closure<AsyncStream<T>>(closure.getOwner(), 
closure.getThisObject()) {
             @SuppressWarnings("unused")
             public AsyncStream<T> doCall(Object... args) {
                 AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
-                CompletableFuture.runAsync(() -> {
+                AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+                CompletableFuture.runAsync(() -> 
AsyncContext.withSnapshot(contextSnapshot, () -> {
                     gen.attachProducer(Thread.currentThread());
                     try {
                         Object[] allArgs = new Object[args.length + 1];
@@ -430,7 +479,7 @@ public class AsyncSupport {
                     } finally {
                         gen.detachProducer(Thread.currentThread());
                     }
-                }, defaultExecutor);
+                }), defaultExecutor);
                 return gen;
             }
         };
@@ -859,6 +908,9 @@ public class AsyncSupport {
      * This method is used internally by {@link groovy.transform.Async @Async}
      * generator methods.  For the {@code async { yield return ... }} closure
      * expression syntax, see {@link #wrapAsyncGenerator(Closure)}.
+     * <p>
+     * The generator body executes with the {@link AsyncContext} snapshot that
+     * was active when generation started.
      *
      * @param body the generator closure (receives an {@link 
AsyncStreamGenerator}
      *             as its single argument)
@@ -867,8 +919,10 @@ public class AsyncSupport {
      */
     @SuppressWarnings("unchecked")
     public static <T> AsyncStream<T> generateAsyncStream(Closure<?> body) {
+        Objects.requireNonNull(body, "body must not be null");
         AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
-        CompletableFuture.runAsync(() -> {
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        CompletableFuture.runAsync(() -> 
AsyncContext.withSnapshot(contextSnapshot, () -> {
             gen.attachProducer(Thread.currentThread());
             try {
                 body.call(gen);
@@ -878,10 +932,31 @@ public class AsyncSupport {
             } finally {
                 gen.detachProducer(Thread.currentThread());
             }
-        }, defaultExecutor);
+        }), defaultExecutor);
         return gen;
     }
 
+    /**
+     * Creates an unbuffered Go-style channel.
+     *
+     * @param <T> the payload type
+     * @return a new unbuffered channel
+     */
+    public static <T> Channel<T> channel() {
+        return new Channel<>();
+    }
+
+    /**
+     * Creates a Go-style channel with the given buffer capacity.
+     *
+     * @param capacity the channel buffer capacity
+     * @param <T>      the payload type
+     * @return a new channel
+     */
+    public static <T> Channel<T> channel(int capacity) {
+        return new Channel<>(capacity);
+    }
+
     // ---- executor configuration -----------------------------------------
 
     /**
@@ -934,7 +1009,10 @@ public class AsyncSupport {
      * (typically in a {@code finally} block generated by the compiler).
      * <p>
      * This method is the runtime entry point for the {@code defer { ... }}
-     * statement, inspired by Go's {@code defer} keyword.
+     * statement, inspired by Go's {@code defer} keyword.  The deferred action
+     * is bound to the current {@link AsyncContext} snapshot so that cleanup
+     * observes the same request/trace metadata that was active when the defer
+     * statement was executed.
      *
      * @param scope  the defer scope (created by {@link #createDeferScope()});
      *               must not be {@code null}
@@ -951,7 +1029,13 @@ public class AsyncSupport {
         if (action == null) {
             throw new IllegalArgumentException("defer action must not be 
null");
         }
-        scope.push(action);
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        scope.push(new Closure<Object>(action.getOwner(), 
action.getThisObject()) {
+            @SuppressWarnings("unused")
+            public Object doCall() {
+                return AsyncContext.withSnapshot(contextSnapshot, () -> 
action.call());
+            }
+        });
     }
 
     /**
@@ -960,6 +1044,10 @@ public class AsyncSupport {
      * subsequent actions still execute.  If multiple actions throw, the
      * first exception is rethrown and subsequent ones are added as
      * {@linkplain Throwable#addSuppressed(Throwable) suppressed exceptions}.
+     * If a deferred action returns an {@link Awaitable},
+     * {@link CompletionStage}, {@link CompletableFuture}, or {@link Future},
+     * the async cleanup is awaited before moving on to the next deferred
+     * action.
      * <p>
      * Called by compiler-generated code in the {@code finally} block of
      * methods that contain {@code defer} statements.
@@ -973,7 +1061,8 @@ public class AsyncSupport {
         Throwable firstError = null;
         while (!scope.isEmpty()) {
             try {
-                scope.pop().call();
+                Object result = scope.pop().call();
+                awaitDeferredResult(result);
             } catch (Throwable t) {
                 if (firstError == null) {
                     firstError = t;
@@ -1041,4 +1130,13 @@ public class AsyncSupport {
         if (t instanceof CompletionException ce) return ce;
         return new CompletionException(t);
     }
+
+    private static void awaitDeferredResult(Object result) {
+        if (result instanceof Awaitable
+                || result instanceof CompletableFuture
+                || result instanceof CompletionStage
+                || result instanceof Future) {
+            await(result);
+        }
+    }
 }
diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java 
b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
index c2ce0d6145..b4bc7f5271 100644
--- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
+++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
@@ -18,6 +18,7 @@
  */
 package org.apache.groovy.runtime.async;
 
+import groovy.concurrent.AsyncContext;
 import groovy.concurrent.Awaitable;
 
 import java.util.Objects;
@@ -170,11 +171,15 @@ public class GroovyPromise<T> implements Awaitable<T> {
      * {@inheritDoc}
      * <p>
      * Returns a new {@code GroovyPromise} whose result is obtained by applying
-     * the given function to this promise's result.
+     * the given function to this promise's result.  The current
+     * {@link AsyncContext} snapshot is captured when the continuation is
+     * registered and restored when it executes.
      */
     @Override
     public <U> Awaitable<U> then(Function<? super T, ? extends U> fn) {
-        return new GroovyPromise<>(future.thenApply(fn));
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        return new GroovyPromise<>(future.thenApply(value ->
+                AsyncContext.withSnapshot(contextSnapshot, () -> 
fn.apply(value))));
     }
 
     /**
@@ -182,10 +187,15 @@ public class GroovyPromise<T> implements Awaitable<T> {
      * <p>
      * Returns a new {@code GroovyPromise} that is the result of composing this
      * promise with the async function, enabling flat-mapping of awaitables.
+     * The current {@link AsyncContext} snapshot is captured when the
+     * continuation is registered and restored when it executes.
      */
     @Override
     public <U> Awaitable<U> thenCompose(Function<? super T, ? extends 
Awaitable<U>> fn) {
-        return new GroovyPromise<>(future.thenCompose(t -> 
fn.apply(t).toCompletableFuture()));
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        return new GroovyPromise<>(future.thenCompose(value ->
+                AsyncContext.withSnapshot(contextSnapshot,
+                        () -> fn.apply(value).toCompletableFuture())));
     }
 
     /**
@@ -194,14 +204,17 @@ public class GroovyPromise<T> implements Awaitable<T> {
      * Returns a new {@code GroovyPromise} that handles exceptions thrown by 
this promise.
      * The throwable passed to the handler is deeply unwrapped to strip JDK
      * wrapper layers ({@code CompletionException}, {@code 
ExecutionException}).
+     * The handler runs with the {@link AsyncContext} snapshot that was active
+     * when the recovery continuation was registered.
      */
     @Override
     public Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn) {
-        return new GroovyPromise<>(future.exceptionally(t -> {
-            // Unwrap all wrapper layers so handler sees the original exception
-            Throwable cause = AsyncSupport.deepUnwrap(t);
-            return fn.apply(cause);
-        }));
+        AsyncContext.Snapshot contextSnapshot = AsyncContext.capture();
+        return new GroovyPromise<>(future.exceptionally(t ->
+                AsyncContext.withSnapshot(contextSnapshot, () -> {
+                    Throwable cause = AsyncSupport.deepUnwrap(t);
+                    return fn.apply(cause);
+                })));
     }
 
     /**
diff --git 
a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java 
b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
index 5edd5f9e15..50aab9776a 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java
@@ -53,9 +53,13 @@ import static 
org.codehaus.groovy.ast.tools.GenericsUtils.makeClassSafeWithGener
  *       the runtime's {@code await()} via {@link AsyncTransformHelper}</li>
  *   <li>The method body is executed asynchronously — or as an async generator
  *       if it contains {@code yield return} — via factory methods on
- *       {@link AsyncTransformHelper}</li>
+ *       {@link AsyncTransformHelper}.  The runtime preserves
+ *       {@link groovy.concurrent.AsyncContext} snapshots across those async
+ *       boundaries.</li>
  *   <li>Methods containing {@code defer} statements are wrapped in a
- *       try-finally block that executes deferred actions in LIFO order</li>
+ *       try-finally block that executes deferred actions in LIFO order, with
+ *       each deferred closure bound to the async context active at 
registration
+ *       time</li>
  *   <li>The return type becomes {@code Awaitable<T>}
  *       (or {@code AsyncStream<T>} for generators)</li>
  * </ol>
diff --git 
a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java 
b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
index 6c07699678..8f0d137c16 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
@@ -71,7 +71,10 @@ import static 
org.codehaus.groovy.ast.tools.GeneralUtils.varX;
  * <b>Rewriting utilities</b>
  * ({@link #injectGenParamIntoYieldReturnCalls(Statement, Parameter)},
  * {@link #wrapWithDeferScope(Statement)}) perform targeted AST mutations
- * that transform high-level syntax into the lower-level runtime calls.
+ * that transform high-level syntax into the lower-level runtime calls.  The
+ * generated runtime calls now also inherit async-local context automatically,
+ * so compiler-generated code retains request/trace metadata across thread hops
+ * without special-case AST handling.
  *
  * @see AsyncASTTransformation
  * @since 6.0.0
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index 2d5d1f0771..adca9476f3 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -39,6 +39,8 @@ Key capabilities include:
 * **`for await`** — iterate over asynchronous data sources
 * **`yield return`** — produce asynchronous streams (generators)
 * **`defer`** — schedule Go-style cleanup actions that run when the method 
completes
+* **`AsyncContext`** — propagate request, trace, and tenant metadata across 
async boundaries
+* **`Awaitable.go`, `Channel`, and `Awaitable.any`** — Go-inspired task 
spawning and channel-based communication primitives
 * **Framework integration** — built-in adapters for `CompletableFuture` and 
`Future`;
 `Flow.Publisher` support via an auto-discovered runtime adapter;
 extensible to RxJava, Reactor, and Spring via the adapter registry
@@ -344,9 +346,10 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=async_lambda,indent=0]
 ----
 
 [[for-await]]
+[[for_await_section]]
 == `for await` — Async Iteration
 
-The `for await` loop iterates over asynchronous data sources (`AsyncStream`, 
`Flow.Publisher`, `Iterable`).
+The `for await` loop iterates over asynchronous data sources (`AsyncStream`, 
`Flow.Publisher`, `Channel`, `Iterable`).
 Each element is resolved asynchronously before the loop body executes. This is 
analogous to JavaScript's
 `for await...of` and C#'s `await foreach`.
 
@@ -476,6 +479,19 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=defer_exception,indent=0]
 
include::../test/AsyncAwaitSpecTest.groovy[tags=defer_resource_cleanup,indent=0]
 ----
 
+=== Async Cleanup in Deferred Blocks
+
+Deferred blocks may also return an `Awaitable`, `CompletionStage`, or `Future`.
+Groovy waits for that asynchronous cleanup to finish before the enclosing async
+method completes, which makes `defer` practical for non-blocking shutdown paths
+such as flushing telemetry, closing async streams, or returning buffered work
+to a channel:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=defer_async_cleanup,indent=0]
+----
+
 [TIP]
 ====
 When an exception occurs in a deferred block and the enclosing method has also 
thrown,
@@ -666,7 +682,7 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=from_conversion,indent=0]
 `Awaitable.from()` accepts any type supported by a registered adapter: 
`CompletableFuture`,
 `CompletionStage`, `Future`, `Flow.Publisher`, or any third-party type with a 
custom adapter.
 If the source is already an `Awaitable`, it is returned as-is. 
`AsyncStream.from()` works
-similarly for multi-value sources such as `Iterable`, `Iterator`, or 
`Flow.Publisher`.
+similarly for multi-value sources such as `Iterable`, `Iterator`, `Channel`, 
or `Flow.Publisher`.
 
 === Built-in Adapters
 
@@ -676,6 +692,7 @@ asynchronous types from third-party frameworks. The 
built-in adapter handles:
 * `groovy.concurrent.Awaitable` (native Groovy promise — passthrough)
 * `java.util.concurrent.CompletableFuture` and `CompletionStage`
 * `java.util.concurrent.Future` (blocking, delegated to a configurable 
executor)
+* `groovy.concurrent.Channel` (automatically converted to `AsyncStream` for 
`for await`)
 
 Additionally, `java.util.concurrent.Flow.Publisher` support (single-value 
`await` and
 multi-value `for await`) is provided by an internal runtime adapter
@@ -1095,12 +1112,24 @@ JavaScript, C#, Kotlin, and Swift, for developers 
familiar with those languages.
 | `Task { body }`
 
 | **Async iteration**
-| `for await (x in src) { }`
+| `for await (x in src) { }` +
+_(supports `AsyncStream`, `Flow.Publisher`, `Channel`, `Iterable`)_
 | `for await (x of src) { }`
 | `await foreach (x in src) { }`
 | _(manual via `Flow`, `Channel`, or `Flow.collect`)_
 | `for try await x in seq`
 
+| **Channels**
+| `Awaitable.channel(n)` +
+`await ch.send(x)` / `await ch.receive()` +
+`for await (x in ch) { }`
+| _(none; use streams or message ports)_
+| `System.Threading.Channels` +
+`Channel<T>` / `ChannelReader.ReadAllAsync()`
+| `Channel<T>` / `channel.send()` +
+`for (x in channel) { }`
+| `AsyncStream` / `AsyncChannel` (3rd party)
+
 | **Async generator**
 | `yield return expr`
 | `yield expr` (in `async function*`)
@@ -1289,6 +1318,195 @@ Always close a scope.  An unclosed scope leaves child 
tasks in an indeterminate
 and may leak threads.
 ====
 
+[[async-context]]
+== Async Execution Context with `AsyncContext`
+
+Asynchronous code frequently needs more than just a return value: it must also
+carry request IDs, tenant IDs, security information, locale overrides, or
+trace metadata through thread hops.  `AsyncContext` provides a small, explicit,
+async-local map for exactly that purpose.
+
+Groovy captures the current `AsyncContext` when a task or continuation is
+scheduled and restores that snapshot when it runs.  This gives you the most
+useful production invariant:
+
+* child tasks inherit the parent context as it existed at spawn time;
+* child-side mutations do not leak back into the parent;
+* continuations (`.then()`, `.exceptionally()`, `.handle()`, etc.) see the
+  context that was active when they were registered.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_context_basic,indent=0]
+----
+
+=== Snapshots and Manual Restoration
+
+`AsyncContext.capture()` returns an immutable `Snapshot` of the current 
context.
+You can restore that snapshot later—even from a different thread—using
+`AsyncContext.withSnapshot(snapshot) { ... }`.  Snapshots are lightweight: they
+are simply frozen copies of the context map at capture time.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_context_snapshot,indent=0]
+----
+
+[TIP]
+====
+Use `AsyncContext` for small pieces of execution metadata — correlation IDs,
+tenant IDs, principals, feature flags.  Keep large mutable business objects in
+ordinary method parameters or return values.
+====
+
+[NOTE]
+====
+`AsyncContext` is backed by a `ThreadLocal`.  In pooled-thread environments the
+context is automatically captured and restored by the async runtime, but if you
+create your own threads you must capture a snapshot beforehand and restore it
+manually via `AsyncContext.withSnapshot(snapshot) { ... }`.
+====
+
+[[go-tasks-and-channels]]
+== Go-Style Tasks and Channels
+
+Groovy's core async model is based on `Awaitable`, but it also provides
+Go-inspired coordination primitives for workflows that benefit from explicit
+message passing.
+
+=== Spawning Tasks with `go`
+
+`Awaitable.go { ... }` starts a lightweight task.  Outside an `AsyncScope` it
+behaves like a detached async job.  Inside an `AsyncScope`, it automatically
+joins the current scope so the task is awaited and cancelled with its siblings.
+
+=== Channels
+
+`Awaitable.channel()` creates an unbuffered rendezvous channel; 
`Awaitable.channel(n)`
+creates a buffered channel with capacity `n`.  Both `send()` and `receive()`
+return `Awaitable` values, which means they compose naturally with `await` and
+with `Awaitable.any(...)` for first-completion-wins logic.
+
+Key channel properties:
+
+* **Unbuffered (capacity 0)**: `send()` suspends until a matching `receive()`
+  arrives, providing direct handoff rendezvous semantics.
+* **Buffered (capacity > 0)**: `send()` completes immediately if buffer space 
is
+  available; otherwise it suspends until a consumer drains an element.
+* **Close semantics**: closing a channel is idempotent.  Buffered values remain
+  receivable after close.  Once drained, further receives fail with
+  `ChannelClosedException`.  Pending senders fail immediately on close.
+* **Null rejection**: channels reject `null` payloads to maintain a clean
+  distinction between "no value yet" and actual data.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=go_channel_select,indent=0]
+----
+
+=== Racing Channel Operations
+
+Use `Awaitable.any(...)` to race channel receives against timeouts or other
+channels — the first completed source wins:
+
+[source,groovy]
+----
+def value = await Awaitable.any(
+    ch.receive(),                           // channel receive
+    Awaitable.delay(1000).then { 'timeout' } // timeout fallback
+)
+----
+
+This is the Groovy equivalent of Go's `select` statement.  Because 
`Awaitable.any`
+accepts any awaitable source, it works seamlessly with channels, timers, and
+arbitrary async tasks.
+
+=== Iterating Channels with `for await`
+
+Because `Channel` integrates with the <<for_await_section,`for await`>> syntax,
+you can iterate received values in a natural loop — the Groovy equivalent of 
Go's
+`for range ch`:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=for_await_channel,indent=0]
+----
+
+Internally, `for await` obtains a read-only `AsyncStream` view from the channel
+(via `Channel.asStream()`).  Each iteration calls `receive()`; when the 
channel is
+closed and all buffered values are drained, the stream signals completion and 
the
+loop exits.
+
+[IMPORTANT]
+====
+The stream view does **not** own the channel.  Exiting the loop — whether
+normally, via `break`, or on exception — does _not_ close the channel.  The
+producer is responsible for closing the channel when it has finished sending.
+This matches Go's convention that only the _sender_ closes a channel.
+====
+
+=== Fan-In Pipelines
+
+Channels are particularly effective for fan-in and fan-out pipelines where
+multiple producers feed a shared channel and one or more consumers drain it:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=channel_fan_in,indent=0]
+----
+
+This pattern is useful for work distribution, aggregating results from
+parallel workers, and building actor-like coordination without explicit locks.
+
+=== Channels vs `Flow.Publisher`
+
+Channels and `Flow.Publisher` both carry asynchronous data, and both work with
+<<for_await_section,`for await`>>.  However, they serve **different purposes** 
and
+should _not_ be confused or substituted for each other:
+
+[cols="1,2,2"]
+|===
+| Aspect | Channel | `Flow.Publisher`
+
+| **Model**
+| Bidirectional send/receive (CSP)
+| Unidirectional push (Reactive Streams)
+
+| **Backpressure**
+| Implicit — sender waits when the buffer is full
+| Explicit — subscriber signals demand via `request(n)`
+
+| **Direction**
+| Symmetric — both sides actively write and read
+| Asymmetric — publisher pushes, subscriber pulls demand
+
+| **Close semantics**
+| Idempotent; buffered values survive close and drain first
+| Terminal `onComplete`/`onError` signals after all items
+
+| **Multi-consumer**
+| Competing — each `receive()` is exclusive (only one consumer gets the value)
+| Copied — each subscriber gets its own stream of items
+
+| **Best for**
+| Task-to-task coordination, fan-in/fan-out pipelines
+| Consuming reactive streams, HTTP bodies, database cursors
+|===
+
+Despite these differences, **both are consumed through the same `for await` 
syntax**
+via the unified `AsyncStream` abstraction:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=channel_vs_flow_for_await,indent=0]
+----
+
+TIP: Use **channels** when you control both the producer and consumer and need
+explicit coordination (e.g., work queues, pipelines, actor-style messaging).
+Use **`Flow.Publisher`** when you receive a push-based data stream from an
+external source (e.g., a reactive HTTP client, a message broker, or a database
+cursor).
+
 [[performance-notes]]
 == Performance Characteristics
 
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy 
b/src/spec/test/AsyncAwaitSpecTest.groovy
index 565fc35e91..a842318ef7 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -1529,4 +1529,248 @@ scope.close() // waits for all children, idempotent
 // end::async_scope_manual[]
         '''
     }
+
+    @Test
+    void testAsyncContextPropagation() {
+        assertScript '''
+// tag::async_context_basic[]
+import groovy.concurrent.AsyncContext
+import groovy.concurrent.Awaitable
+
+AsyncContext.with([traceId: 'req-7', tenant: 'acme']) {
+    def task = Awaitable.go {
+        assert AsyncContext.current()['traceId'] == 'req-7'
+        AsyncContext.current()['traceId'] = 'child-trace'
+        [trace: AsyncContext.current()['traceId'], tenant: 
AsyncContext.current()['tenant']]
+    }
+
+    def child = await task
+    assert child == [trace: 'child-trace', tenant: 'acme']
+    assert AsyncContext.current()['traceId'] == 'req-7'
+}
+// end::async_context_basic[]
+        '''
+    }
+
+    @Test
+    void testGoChannelSelect() {
+        assertScript '''
+// tag::go_channel_select[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+def result = AsyncScope.withScope { scope ->
+    def ch = Awaitable.channel(1)
+
+    def producer = Awaitable.go {
+        await ch.send('payload')
+        'sent'
+    }
+
+    def consumer = scope.async {
+        await ch.receive()
+    }
+
+    def value = await Awaitable.any(
+        consumer,
+        Awaitable.delay(250).then { 'timeout' }
+    )
+
+    [value: value, producer: await(producer), childCount: scope.childCount]
+}
+
+assert result == [value: 'payload', producer: 'sent', childCount: 2]
+// end::go_channel_select[]
+        '''
+    }
+
+    @Test
+    void testDeferAsyncCleanup() {
+        assertScript '''
+// tag::defer_async_cleanup[]
+import groovy.concurrent.Awaitable
+
+class ResourceHolder {
+    final List<String> log = []
+
+    Awaitable<Void> closeAsync(String name) {
+        Awaitable.go {
+            await Awaitable.delay(10)
+            log << "close:${name}"
+            null
+        }
+    }
+
+    async useResource() {
+        defer { closeAsync('outer') }
+        defer { closeAsync('inner') }
+        log << 'body'
+        return log
+    }
+}
+
+def holder = new ResourceHolder()
+assert await(holder.useResource()) == ['body', 'close:inner', 'close:outer']
+// end::defer_async_cleanup[]
+        '''
+    }
+
+    @Test
+    void testChannelFanInPipeline() {
+        assertScript '''
+// tag::channel_fan_in[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+def results = AsyncScope.withScope { scope ->
+    def ch = Awaitable.channel(8)
+
+    // Fan-out: multiple producers write to a shared channel
+    3.times { producerId ->
+        scope.async {
+            for (int i = 0; i < 4; i++) {
+                await ch.send("p${producerId}-item${i}")
+            }
+            null
+        }
+    }
+
+    // Fan-in: single consumer drains the channel
+    def consumer = scope.async {
+        def items = []
+        12.times { items << await(ch.receive()) }
+        items.sort()
+    }
+
+    await consumer
+}
+
+assert results.size() == 12
+assert results.count { it.startsWith('p0-') } == 4
+assert results.count { it.startsWith('p1-') } == 4
+assert results.count { it.startsWith('p2-') } == 4
+// end::channel_fan_in[]
+        '''
+    }
+
+    @Test
+    void testAsyncContextSnapshot() {
+        assertScript '''
+// tag::async_context_snapshot[]
+import groovy.concurrent.AsyncContext
+import groovy.concurrent.Awaitable
+
+// Capture a snapshot to hand off to a different async scope
+AsyncContext.current()['requestId'] = 'req-99'
+def snapshot = AsyncContext.capture()
+
+// Mutate the parent context — the snapshot is unaffected
+AsyncContext.current()['requestId'] = 'req-100'
+
+def childResult = await Awaitable.go {
+    // Child sees the parent's captured state, not the earlier snapshot
+    AsyncContext.current()['requestId']
+}
+
+assert childResult == 'req-100'  // go captures at call time
+
+// Manually restore snapshot in a different scope
+def restored = AsyncContext.withSnapshot(snapshot, { ->
+    AsyncContext.current()['requestId']
+} as java.util.function.Supplier)
+
+assert restored == 'req-99'
+// end::async_context_snapshot[]
+        '''
+    }
+
+    @Test
+    void testForAwaitOverChannel() {
+        assertScript '''
+// tag::for_await_channel[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+def items = []
+def ch = Awaitable.channel(4)
+
+AsyncScope.withScope { scope ->
+    // Producer sends values and closes the channel when done
+    scope.async {
+        for (word in ['hello', 'async', 'world']) {
+            await ch.send(word)
+        }
+        ch.close()
+        null
+    }
+
+    // Consumer iterates the channel with for-await
+    for await (item in ch) {
+        items << item
+    }
+}
+
+assert items == ['hello', 'async', 'world']
+// end::for_await_channel[]
+        '''
+    }
+
+    @Test
+    void testChannelVsFlowPublisherForAwait() {
+        assertScript '''
+// tag::channel_vs_flow_for_await[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+import java.util.concurrent.Flow
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+// --- Channel: Go-style coordination primitive ---
+// Producers and consumers communicate through explicit send/receive.
+// Best for task-to-task coordination and fan-in/fan-out pipelines.
+def channelItems = []
+def ch = Awaitable.channel(3)
+
+AsyncScope.withScope { scope ->
+    scope.async {
+        for (n in [10, 20, 30]) { await ch.send(n) }
+        ch.close()
+        null
+    }
+    for await (item in ch) {
+        channelItems << item
+    }
+}
+assert channelItems == [10, 20, 30]
+
+// --- Flow.Publisher: reactive data stream ---
+// A push-based source adapted automatically for for-await consumption.
+// Best for consuming reactive streams, HTTP responses, and database cursors.
+def publisherItems = []
+
+def publisher = new Flow.Publisher<Integer>() {
+    void subscribe(Flow.Subscriber<? super Integer> subscriber) {
+        subscriber.onSubscribe(new Flow.Subscription() {
+            int count = 0
+            void request(long n) {
+                for (int i = 0; i < n && count < 3; i++) {
+                    count++
+                    subscriber.onNext(count * 100)
+                }
+                if (count >= 3) subscriber.onComplete()
+            }
+            void cancel() {}
+        })
+    }
+}
+for await (item in publisher) {
+    publisherItems << item
+}
+assert publisherItems == [100, 200, 300]
+
+// Both Channel and Flow.Publisher work with 'for await' through
+// the unified AsyncStream abstraction — same syntax, different use cases.
+// end::channel_vs_flow_for_await[]
+        '''
+    }
 }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncContextChannelTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncContextChannelTest.groovy
new file mode 100644
index 0000000000..d6c9515236
--- /dev/null
+++ 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncContextChannelTest.groovy
@@ -0,0 +1,976 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.codehaus.groovy.transform
+
+import org.junit.jupiter.api.Test
+
+import static groovy.test.GroovyAssert.assertScript
+import static groovy.test.GroovyAssert.shouldFail
+
+/**
+ * Integration tests for async context propagation, channels, and structured
+ * concurrency using Groovy's async/await syntax.  All tests use
+ * {@link groovy.test.GroovyAssert#assertScript assertScript} so the code
+ * under test exercises the full Groovy compiler pipeline including the
+ * {@code async}/{@code await} AST transformation.
+ */
+final class AsyncContextChannelTest {
+
+    // ---- AsyncContext propagation ----
+
+    @Test
+    void testAsyncContextPropagatesAndIsIsolatedAcrossTasks() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['traceId'] = 'parent-trace'
+
+            def child = Awaitable.go {
+                assert AsyncContext.current()['traceId'] == 'parent-trace'
+                AsyncContext.current()['traceId'] = 'child-trace'
+                AsyncContext.current()['traceId']
+            }
+
+            assert await(child) == 'child-trace'
+            assert AsyncContext.current()['traceId'] == 'parent-trace'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testPromiseContinuationsCaptureAsyncContext() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['requestId'] = 'request-1'
+
+            def source = Awaitable.go {
+                await Awaitable.delay(25)
+                40
+            }
+
+            def continuation = source.then { value ->
+                assert AsyncContext.current()['requestId'] == 'request-1'
+                value + 2
+            }
+
+            AsyncContext.current()['requestId'] = 'request-2'
+
+            assert await(continuation) == 42
+            assert AsyncContext.current()['requestId'] == 'request-2'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextPreservedAcrossContinuationChains() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['traceId'] = 'trace-chain'
+
+            def result = Awaitable.go { 10 }
+                .then { it * 2 }
+                .then { value ->
+                    assert AsyncContext.current()['traceId'] == 'trace-chain'
+                    value + 1
+                }
+                .thenCompose { value ->
+                    assert AsyncContext.current()['traceId'] == 'trace-chain'
+                    Awaitable.of(value * 3)
+                }
+                .handle { value, error ->
+                    assert AsyncContext.current()['traceId'] == 'trace-chain'
+                    value
+                }
+
+            assert await(result) == 63
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextInExceptionallyHandler() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['reqId'] = 'err-test'
+
+            def result = Awaitable.go { throw new RuntimeException('fail') }
+                .exceptionally { error ->
+                    assert AsyncContext.current()['reqId'] == 'err-test'
+                    "recovered: ${error.message}"
+                }
+
+            assert await(result) == 'recovered: fail'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextInWhenCompleteHandler() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CompletableFuture
+            import java.util.concurrent.TimeUnit
+
+            AsyncContext.current()['traceId'] = 'wc-test'
+            def observed = new CompletableFuture<String>()
+
+            def task = Awaitable.go { 42 }
+                .whenComplete { value, error ->
+                    observed.complete(AsyncContext.current()['traceId'] as 
String)
+                }
+
+            await task
+            assert observed.get(2, TimeUnit.SECONDS) == 'wc-test'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextIsolationUnderConcurrentMutations() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            int taskCount = 100
+            AsyncContext.current()['shared'] = 'root'
+
+            def tasks = (0..<taskCount).collect { n ->
+                Awaitable.go {
+                    assert AsyncContext.current()['shared'] == 'root'
+                    AsyncContext.current()['taskId'] = "task-${n}".toString()
+                    await Awaitable.delay(5)
+                    assert AsyncContext.current()['taskId'] == 
"task-${n}".toString()
+                    assert AsyncContext.current()['shared'] == 'root'
+                    AsyncContext.current()['taskId']
+                }
+            }
+
+            def results = await Awaitable.all(*tasks)
+            assert results.toSet().size() == taskCount
+            assert AsyncContext.current()['shared'] == 'root'
+            assert !AsyncContext.current().containsKey('taskId')
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextSnapshotIsImmutableAfterCapture() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['key'] = 'before'
+            def snapshot = AsyncContext.capture()
+
+            AsyncContext.current()['key'] = 'after'
+            AsyncContext.current()['extra'] = 'new'
+
+            assert snapshot.get('key') == 'before'
+            assert !snapshot.containsKey('extra')
+
+            def result = Awaitable.go { AsyncContext.current()['key'] }
+            assert await(result) == 'after'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextWithOverlayAndRestore() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+
+            AsyncContext.current()['env'] = 'production'
+            AsyncContext.current()['user'] = 'admin'
+
+            def result = AsyncContext.with([env: 'staging', feature: 'beta']) {
+                assert AsyncContext.current()['env'] == 'staging'
+                assert AsyncContext.current()['user'] == 'admin'
+                assert AsyncContext.current()['feature'] == 'beta'
+                'inside'
+            }
+
+            assert result == 'inside'
+            assert AsyncContext.current()['env'] == 'production'
+            assert AsyncContext.current()['user'] == 'admin'
+            assert !AsyncContext.current().containsKey('feature')
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextWithNullValueRemovesKey() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+
+            AsyncContext.current()['key'] = 'value'
+            assert AsyncContext.current().containsKey('key')
+            AsyncContext.current()['key'] = null
+            assert !AsyncContext.current().containsKey('key')
+        '''
+    }
+
+    @Test
+    void testAsyncContextSnapshotWith() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+
+            AsyncContext.current()['a'] = '1'
+            AsyncContext.current()['b'] = '2'
+            def snapshot = AsyncContext.capture()
+
+            def merged = snapshot.with([b: '3', c: '4'])
+            assert merged.get('a') == '1'
+            assert merged.get('b') == '3'
+            assert merged.get('c') == '4'
+
+            assert snapshot.get('b') == '2'
+            assert !snapshot.containsKey('c')
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextSnapshotWithNullRemovesKey() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+
+            AsyncContext.current()['a'] = '1'
+            AsyncContext.current()['b'] = '2'
+            def snapshot = AsyncContext.capture()
+
+            def merged = snapshot.with([b: null])
+            assert merged.get('a') == '1'
+            assert !merged.containsKey('b')
+            assert merged.size() == 1
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testAsyncContextScopeRestoresOnException() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+
+            AsyncContext.current()['key'] = 'original'
+
+            try {
+                AsyncContext.with([key: 'overlay']) {
+                    assert AsyncContext.current()['key'] == 'overlay'
+                    throw new RuntimeException('boom')
+                }
+            } catch (RuntimeException ignored) {}
+
+            assert AsyncContext.current()['key'] == 'original'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    // ---- Structured concurrency (AsyncScope) ----
+
+    @Test
+    void testGoJoinsCurrentScope() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            def result = AsyncScope.withScope { scope ->
+                def task = Awaitable.go { 42 }
+                [value: await(task), childCount: scope.childCount]
+            }
+
+            assert result.value == 42
+            assert result.childCount == 1
+        '''
+    }
+
+    @Test
+    void testGoOutsideScopeBehavesAsDetachedTask() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            assert AsyncScope.current() == null
+            def result = await Awaitable.go { 'detached' }
+            assert result == 'detached'
+        '''
+    }
+
+    @Test
+    void testGoInsideScopeRegistersAsChild() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            def result = AsyncScope.withScope { scope ->
+                Awaitable.go { 'child-1' }
+                Awaitable.go { 'child-2' }
+                scope.childCount
+            }
+            assert result == 2
+        '''
+    }
+
+    @Test
+    void testAsyncContextPropagatedIntoScopeChildren() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.AsyncScope
+
+            AsyncContext.current()['scopeCtx'] = 'scoped-value'
+
+            def result = AsyncScope.withScope { scope ->
+                def child = scope.async { AsyncContext.current()['scopeCtx'] }
+                await child
+            }
+
+            assert result == 'scoped-value'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    // ---- Channel tests ----
+
+    @Test
+    void testUnbufferedChannelBackPressuresSenderUntilReceiverArrives() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.TimeUnit
+
+            def channel = Awaitable.channel()
+            def senderStarted = new CountDownLatch(1)
+
+            def sendTask = Awaitable.go {
+                senderStarted.countDown()
+                await channel.send('payload')
+                'sent'
+            }
+
+            assert senderStarted.await(2, TimeUnit.SECONDS)
+            Thread.sleep(50)
+            assert !sendTask.done
+
+            assert await(channel.receive()) == 'payload'
+            assert await(sendTask) == 'sent'
+        '''
+    }
+
+    @Test
+    void testBufferedChannelSendCompletesImmediately() {
+        assertScript '''
+            import groovy.concurrent.Channel
+
+            def channel = new Channel<String>(2)
+
+            def s1 = channel.send('a')
+            def s2 = channel.send('b')
+
+            assert s1.done
+            assert s2.done
+            assert channel.bufferedSize == 2
+
+            assert await(channel.receive()) == 'a'
+            assert await(channel.receive()) == 'b'
+        '''
+    }
+
+    @Test
+    void testUnbufferedChannelRendezvous() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.TimeUnit
+
+            def channel = Awaitable.channel()
+            def senderReady = new CountDownLatch(1)
+            def receiverReady = new CountDownLatch(1)
+
+            def sender = Awaitable.go {
+                senderReady.countDown()
+                receiverReady.await(2, TimeUnit.SECONDS)
+                await channel.send('rendezvous')
+                'sent'
+            }
+
+            def receiver = Awaitable.go {
+                senderReady.await(2, TimeUnit.SECONDS)
+                receiverReady.countDown()
+                await channel.receive()
+            }
+
+            assert await(receiver) == 'rendezvous'
+            assert await(sender) == 'sent'
+        '''
+    }
+
+    @Test
+    void testChannelCloseFailsReceiversAfterDrain() {
+        shouldFail '''
+            import groovy.concurrent.Channel
+            import groovy.concurrent.ChannelClosedException
+
+            def channel = new Channel<String>(1)
+            await channel.send('first')
+            assert channel.close()
+            assert await(channel.receive()) == 'first'
+            await channel.receive()  // should throw ChannelClosedException
+        '''
+    }
+
+    @Test
+    void testChannelRejectsNullPayload() {
+        shouldFail '''
+            import groovy.concurrent.Channel
+            new Channel<String>(1).send(null)
+        '''
+    }
+
+    @Test
+    void testChannelRejectsNegativeCapacity() {
+        shouldFail '''
+            import groovy.concurrent.Channel
+            new Channel<String>(-1)
+        '''
+    }
+
+    @Test
+    void testChannelCloseIsIdempotent() {
+        assertScript '''
+            import groovy.concurrent.Channel
+
+            def channel = new Channel<String>(1)
+            assert channel.close()
+            assert !channel.close()
+        '''
+    }
+
+    @Test
+    void testChannelSendOnClosedChannel() {
+        shouldFail '''
+            import groovy.concurrent.Channel
+            import groovy.concurrent.ChannelClosedException
+
+            def channel = new Channel<String>(1)
+            channel.close()
+            await channel.send('late')  // should throw ChannelClosedException
+        '''
+    }
+
+    @Test
+    void testChannelClosePreservesBufferedValues() {
+        assertScript '''
+            import groovy.concurrent.Channel
+            import static groovy.test.GroovyAssert.shouldFail
+
+            def channel = new Channel<Integer>(5)
+
+            await channel.send(1)
+            await channel.send(2)
+            await channel.send(3)
+            channel.close()
+
+            assert await(channel.receive()) == 1
+            assert await(channel.receive()) == 2
+            assert await(channel.receive()) == 3
+
+            shouldFail(groovy.concurrent.ChannelClosedException) {
+                await channel.receive()
+            }
+        '''
+    }
+
+    @Test
+    void testChannelCloseDrainsBufferToWaitingReceivers() {
+        assertScript '''
+            import groovy.concurrent.Channel
+
+            def channel = new Channel<String>(2)
+            await channel.send('a')
+            await channel.send('b')
+
+            def r1 = channel.receive()
+            def r2 = channel.receive()
+
+            channel.close()
+
+            assert await(r1) == 'a'
+            assert await(r2) == 'b'
+        '''
+    }
+
+    @Test
+    void testChannelToString() {
+        assertScript '''
+            import groovy.concurrent.Channel
+
+            def channel = new Channel<String>(5)
+            await channel.send('a')
+            def str = channel.toString()
+            assert str.contains('capacity=5')
+            assert str.contains('buffered=1')
+            assert str.contains('closed=false')
+        '''
+    }
+
+    @Test
+    void testChannelPendingSendersCancelledOnClose() {
+        shouldFail '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.TimeUnit
+
+            def channel = Awaitable.channel()  // unbuffered
+            def senderStarted = new CountDownLatch(1)
+
+            def sendTask = Awaitable.go {
+                senderStarted.countDown()
+                await channel.send('will-fail')
+            }
+
+            assert senderStarted.await(2, TimeUnit.SECONDS)
+            Thread.sleep(50)
+            channel.close()
+
+            await sendTask  // should throw ChannelClosedException
+        '''
+    }
+
+    @Test
+    void testChannelPendingReceiversFailOnClose() {
+        shouldFail '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.TimeUnit
+
+            def channel = Awaitable.channel()
+            def receiverStarted = new CountDownLatch(1)
+
+            def receiveTask = Awaitable.go {
+                receiverStarted.countDown()
+                await channel.receive()
+            }
+
+            assert receiverStarted.await(2, TimeUnit.SECONDS)
+            Thread.sleep(50)
+            channel.close()
+
+            await receiveTask  // should throw ChannelClosedException
+        '''
+    }
+
+    @Test
+    void testAnyRacesChannelReceiveAgainstTimeout() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+
+            def channel = Awaitable.channel(1)
+
+            // Timeout should win since channel has no data
+            def result = await Awaitable.any(
+                channel.receive(),
+                Awaitable.delay(50).then { 'timeout' }
+            )
+
+            assert result == 'timeout'
+        '''
+    }
+
+    @Test
+    void testAnyWithMultipleChannels() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import groovy.concurrent.Channel
+
+            def ch1 = new Channel<String>(1)
+            def ch2 = new Channel<String>(1)
+
+            await ch2.send('from-ch2')
+
+            def result = await Awaitable.any(
+                ch1.receive(),
+                ch2.receive()
+            )
+
+            assert result == 'from-ch2'
+        '''
+    }
+
+    // ---- High-concurrency tests ----
+
+    @Test
+    void testBufferedChannelHighConcurrencyFanIn() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            int producerCount = 200
+            def channel = Awaitable.channel(32)
+
+            def sum = AsyncScope.withScope { scope ->
+                def senders = (0..<producerCount).collect { n ->
+                    Awaitable.go {
+                        await channel.send(n)
+                        null
+                    }
+                }
+                def consumer = scope.async {
+                    int total = 0
+                    for (int i = 0; i < producerCount; i++) {
+                        total += await(channel.receive())
+                    }
+                    total
+                }
+
+                await Awaitable.all(*senders)
+                await consumer
+            }
+
+            assert sum == (producerCount * (producerCount - 1)) / 2
+        '''
+    }
+
+    @Test
+    void testHighConcurrencyMultiProducerMultiConsumer() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            int producerCount = 50
+            int messagesPerProducer = 20
+            int totalMessages = producerCount * messagesPerProducer
+            def channel = Awaitable.channel(64)
+            def received = new CopyOnWriteArrayList<Integer>()
+
+            AsyncScope.withScope { scope ->
+                (0..<producerCount).each { p ->
+                    scope.async {
+                        for (int i = 0; i < messagesPerProducer; i++) {
+                            await channel.send(p * messagesPerProducer + i)
+                        }
+                        null
+                    }
+                }
+
+                int consumerCount = 10
+                int messagesPerConsumer = totalMessages / consumerCount
+                (0..<consumerCount).each { c ->
+                    scope.async {
+                        for (int i = 0; i < messagesPerConsumer; i++) {
+                            received.add(await channel.receive())
+                        }
+                        null
+                    }
+                }
+            }
+
+            assert received.size() == totalMessages
+            assert received.toSet().size() == totalMessages
+        '''
+    }
+
+    @Test
+    void testHighConcurrencyUnbufferedChannelStressTest() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.atomic.AtomicInteger
+
+            int pairCount = 100
+            def channel = Awaitable.channel()
+            def sum = new AtomicInteger(0)
+
+            AsyncScope.withScope { scope ->
+                (0..<pairCount).each { n ->
+                    scope.async {
+                        await channel.send(n)
+                        null
+                    }
+                    scope.async {
+                        sum.addAndGet(await(channel.receive()) as int)
+                        null
+                    }
+                }
+            }
+
+            assert sum.get() == (pairCount * (pairCount - 1)) / 2
+        '''
+    }
+
+    @Test
+    void testScopeFailFastCancellsSiblings() {
+        shouldFail '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            AsyncScope.withScope { scope ->
+                scope.async {
+                    await Awaitable.delay(10)
+                    throw new RuntimeException('boom')
+                }
+                scope.async {
+                    await Awaitable.delay(5000)
+                    'should be cancelled'
+                }
+            }
+        '''
+    }
+
+    @Test
+    void testScopeClosedRejectsNewTasks() {
+        shouldFail '''
+            import groovy.concurrent.AsyncScope
+
+            def scope = new AsyncScope()
+            scope.close()
+            scope.async { 'too late' }
+        '''
+    }
+
+    @Test
+    void testAsyncContextCapturedAtGoCallTime() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['phase'] = 'A'
+            def task1 = Awaitable.go {
+                await Awaitable.delay(30)
+                AsyncContext.current()['phase']
+            }
+
+            AsyncContext.current()['phase'] = 'B'
+            def task2 = Awaitable.go {
+                await Awaitable.delay(10)
+                AsyncContext.current()['phase']
+            }
+
+            assert await(task1) == 'A'
+            assert await(task2) == 'B'
+            AsyncContext.current().clear()
+        '''
+    }
+
+    // ---- for-await over channels ----
+
+    @Test
+    void testForAwaitOverBufferedChannel() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            def items = []
+            def ch = Awaitable.channel(4)
+
+            AsyncScope.withScope { scope ->
+                scope.async {
+                    await ch.send(1)
+                    await ch.send(2)
+                    await ch.send(3)
+                    ch.close()
+                    null
+                }
+
+                for await (item in ch) {
+                    items << item
+                }
+            }
+
+            assert items == [1, 2, 3]
+        '''
+    }
+
+    @Test
+    void testForAwaitOverUnbufferedChannel() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            def items = []
+            def ch = Awaitable.channel()
+
+            AsyncScope.withScope { scope ->
+                scope.async {
+                    for (int i = 0; i < 5; i++) {
+                        await ch.send(i)
+                    }
+                    ch.close()
+                    null
+                }
+
+                for await (item in ch) {
+                    items << item
+                }
+            }
+
+            assert items == [0, 1, 2, 3, 4]
+        '''
+    }
+
+    @Test
+    void testForAwaitExitsOnChannelClose() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+
+            def ch = Awaitable.channel(2)
+            await ch.send('a')
+            await ch.send('b')
+            ch.close()
+
+            def items = []
+            for await (item in ch) {
+                items << item
+            }
+            assert items == ['a', 'b']
+        '''
+    }
+
+    @Test
+    void testForAwaitBreakDoesNotCloseChannel() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            def ch = Awaitable.channel(10)
+            for (int i = 0; i < 5; i++) { await ch.send(i) }
+
+            def firstTwo = []
+            for await (item in ch) {
+                firstTwo << item
+                if (firstTwo.size() == 2) break
+            }
+
+            // Channel should still be open — asStream().close() is a no-op
+            assert !ch.closed
+            assert firstTwo == [0, 1]
+
+            // Remaining items are still receivable
+            assert await(ch.receive()) == 2
+        '''
+    }
+
+    @Test
+    void testForAwaitOverEmptyClosedChannel() {
+        assertScript '''
+            import groovy.concurrent.Channel
+
+            def ch = new Channel<String>(1)
+            ch.close()
+
+            def items = []
+            for await (item in ch) {
+                items << item
+            }
+            assert items.isEmpty()
+        '''
+    }
+
+    @Test
+    void testForAwaitConcurrentFanInPipeline() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            int producerCount = 5
+            int itemsPerProducer = 10
+            def ch = Awaitable.channel(16)
+
+            def received = []
+            AsyncScope.withScope { scope ->
+                def producerTasks = (0..<producerCount).collect { p ->
+                    scope.async {
+                        for (int i = 0; i < itemsPerProducer; i++) {
+                            await ch.send("p${p}-${i}".toString())
+                        }
+                        null
+                    }
+                }
+
+                // Close channel after all producers complete (deterministic, 
no timing dependency)
+                scope.async {
+                    await Awaitable.all(producerTasks as Object[])
+                    ch.close()
+                    null
+                }
+
+                for await (item in ch) {
+                    received << item
+                }
+            }
+
+            assert received.size() == producerCount * itemsPerProducer
+            assert received.toSet().size() == producerCount * itemsPerProducer
+        '''
+    }
+
+    @Test
+    void testForAwaitPreservesAsyncContext() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            AsyncContext.current()['traceId'] = 'for-await-trace'
+            def ch = Awaitable.channel(3)
+
+            AsyncScope.withScope { scope ->
+                scope.async {
+                    await ch.send('a')
+                    await ch.send('b')
+                    ch.close()
+                    null
+                }
+
+                for await (item in ch) {
+                    assert AsyncContext.current()['traceId'] == 
'for-await-trace'
+                }
+            }
+
+            AsyncContext.current().clear()
+        '''
+    }
+
+    @Test
+    void testChannelAsStreamExplicit() {
+        assertScript '''
+            import groovy.concurrent.Channel
+
+            def ch = new Channel<Integer>(3)
+            await ch.send(10)
+            await ch.send(20)
+            await ch.send(30)
+            ch.close()
+
+            def stream = ch.asStream()
+            def items = []
+            while (await stream.moveNext()) {
+                items << stream.current
+            }
+            assert items == [10, 20, 30]
+        '''
+    }
+}
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncRuntimeEnhancementTest.groovy
 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncRuntimeEnhancementTest.groovy
new file mode 100644
index 0000000000..552eb60d61
--- /dev/null
+++ 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncRuntimeEnhancementTest.groovy
@@ -0,0 +1,254 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.codehaus.groovy.transform
+
+import org.junit.jupiter.api.Test
+
+import static groovy.test.GroovyAssert.assertScript
+
+/**
+ * Integration coverage for the upgraded async runtime through real Groovy
+ * syntax and metaprogramming features.
+ */
+final class AsyncRuntimeEnhancementTest {
+
+    @Test
+    void testDeepAsyncExceptionChainsRetainCatchSemantics() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+            import static groovy.test.GroovyAssert.shouldFail
+
+            class Service {
+                async leaf() {
+                    await Awaitable.delay(10)
+                    throw new IOException('leaf exploded')
+                }
+
+                async middle() {
+                    await leaf()
+                }
+
+                async top() {
+                    try {
+                        await middle()
+                    } catch (IOException ioe) {
+                        throw new 
IllegalStateException("trace=${AsyncContext.current()['traceId']}", ioe)
+                    }
+                }
+            }
+
+            AsyncContext.with([traceId: 'req-42']) {
+                def failure = shouldFail(IllegalStateException) {
+                    await new Service().top()
+                }
+
+                assert failure.message == 'trace=req-42'
+                assert failure.cause instanceof IOException
+                assert failure.cause.message == 'leaf exploded'
+            }
+        '''
+    }
+
+    @Test
+    void testDeferWaitsForReturnedAsyncCleanup() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+
+            class Resource {
+                final List<String> log = []
+
+                Awaitable<Void> closeAsync(String name) {
+                    Awaitable.go {
+                        await Awaitable.delay(15)
+                        log << "close:${name}"
+                        null
+                    }
+                }
+            }
+
+            class Service {
+                final Resource resource = new Resource()
+
+                async work() {
+                    defer { resource.closeAsync('outer') }
+                    defer { resource.closeAsync('inner') }
+                    resource.log << 'body'
+                    return resource.log
+                }
+            }
+
+            def service = new Service()
+            assert await(service.work()) == ['body', 'close:inner', 
'close:outer']
+        '''
+    }
+
+    @Test
+    void testGoChannelsAndMetaprogrammingRemainGroovyFriendly() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            String.metaClass.shout = { -> delegate.toUpperCase() + '!' }
+
+            try {
+                def result = AsyncScope.withScope { scope ->
+                    def channel = Awaitable.channel(1)
+
+                    def producer = Awaitable.go {
+                        await channel.send('groovy'.shout())
+                        'produced'
+                    }
+
+                    def consumer = scope.async {
+                        def prefix = 'hello'
+                        def formatter = { value -> "${prefix} ${value}" }
+                        formatter(await channel.receive())
+                    }
+
+                    def selected = await Awaitable.any(
+                        consumer,
+                        Awaitable.delay(500).then { 'timeout' }
+                    )
+
+                    [message: selected, producer: await(producer), childCount: 
scope.childCount]
+                }
+
+                assert result.message == 'hello GROOVY!'
+                assert result.producer == 'produced'
+                assert result.childCount == 2
+            } finally {
+                GroovySystem.metaClassRegistry.removeMetaClass(String)
+            }
+        '''
+    }
+
+    @Test
+    void testAsyncContextPropagatesToAsyncMethodsAndGenerators() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            class ContextAwareService {
+                async fetchWithContext() {
+                    assert AsyncContext.current()['traceId'] == 'gen-trace'
+                    await Awaitable.delay(10)
+                    "value-${AsyncContext.current()['traceId']}"
+                }
+
+                async generateItems() {
+                    for (int i = 0; i < 3; i++) {
+                        assert AsyncContext.current()['traceId'] == 'gen-trace'
+                        yield return 
"item-$i-${AsyncContext.current()['traceId']}"
+                    }
+                }
+            }
+
+            AsyncContext.with([traceId: 'gen-trace']) {
+                def svc = new ContextAwareService()
+                assert await(svc.fetchWithContext()) == 'value-gen-trace'
+
+                def items = []
+                for await (item in svc.generateItems()) {
+                    items << item
+                }
+                assert items == ['item-0-gen-trace', 'item-1-gen-trace', 
'item-2-gen-trace']
+            }
+        '''
+    }
+
+    @Test
+    void testDeferPreservesAsyncContextAtRegistrationTime() {
+        assertScript '''
+            import groovy.concurrent.AsyncContext
+            import groovy.concurrent.Awaitable
+
+            class DeferContextService {
+                final List<String> log = []
+
+                async work() {
+                    AsyncContext.current()['phase'] = 'init'
+                    defer {
+                        log << 
"cleanup-phase:${AsyncContext.current()['phase']}".toString()
+                        null
+                    }
+                    AsyncContext.current()['phase'] = 'body'
+                    log << 
"body-phase:${AsyncContext.current()['phase']}".toString()
+                    return log
+                }
+            }
+
+            def svc = new DeferContextService()
+            def result = await svc.work()
+            assert result.contains('body-phase:body')
+            assert result.contains('cleanup-phase:init')
+        '''
+    }
+
+    @Test
+    void testHighConcurrencyChannelPipelineWithAsyncMethods() {
+        assertScript '''
+            import groovy.concurrent.AsyncScope
+            import groovy.concurrent.Awaitable
+
+            class Pipeline {
+                static async processPipeline(int itemCount) {
+                    def stage1 = Awaitable.channel(16)
+                    def stage2 = Awaitable.channel(16)
+                    def results = new 
java.util.concurrent.CopyOnWriteArrayList()
+
+                    AsyncScope.withScope { scope ->
+                        // Producer
+                        scope.async {
+                            for (int i = 0; i < itemCount; i++) {
+                                await stage1.send(i)
+                            }
+                            null
+                        }
+
+                        // Transformer (2 workers)
+                        2.times {
+                            scope.async {
+                                for (int i = 0; i < itemCount / 2; i++) {
+                                    int value = await stage1.receive()
+                                    await stage2.send(value * 10)
+                                }
+                                null
+                            }
+                        }
+
+                        // Consumer
+                        scope.async {
+                            for (int i = 0; i < itemCount; i++) {
+                                results.add(await stage2.receive())
+                            }
+                            null
+                        }
+                    }
+
+                    results.sort()
+                }
+            }
+
+            def results = await Pipeline.processPipeline(20)
+            assert results.size() == 20
+            assert results == (0..<20).collect { it * 10 }
+        '''
+    }
+}

Reply via email to