This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 7cd7f9edce Minor tweaks
7cd7f9edce is described below
commit 7cd7f9edcef1de93ca8df6a135a705c4e0a0f716
Author: Daniel Sun <[email protected]>
AuthorDate: Thu Mar 19 02:57:26 2026 +0900
Minor tweaks
---
.../concurrent/AwaitableAdapterRegistry.java | 2 -
.../apache/groovy/runtime/async/AsyncSupport.java | 69 ++++----
.../groovy/transform/AsyncTransformHelper.java | 36 ++--
src/spec/doc/core-async-await.adoc | 191 ++++++++-------------
.../codehaus/groovy/transform/AsyncApiTest.groovy | 82 +++++++++
5 files changed, 200 insertions(+), 180 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 61398d154d..9ecea35288 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -252,8 +252,6 @@ public class AwaitableAdapterRegistry {
cf.completeExceptionally(ce);
} catch (ExecutionException e) {
cf.completeExceptionally(e.getCause() != null ? e.getCause() :
e);
- } catch (CancellationException e) {
- cf.completeExceptionally(e);
} catch (Exception e) {
cf.completeExceptionally(e);
}
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 96658f81e8..7996c110e3 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -456,13 +456,7 @@ public class AsyncSupport {
if (awaitables == null || awaitables.length == 0) {
return new ArrayList<>();
}
- for (int i = 0; i < awaitables.length; i++) {
- if (awaitables[i] == null) {
- throw new IllegalArgumentException("awaitAll: element at index
" + i + " is null");
- }
- }
- CompletableFuture<?>[] futures =
-
Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCompletableFutures(awaitables,
"awaitAll");
try {
CompletableFuture.allOf(futures).join();
} catch (CompletionException e) {
@@ -490,13 +484,7 @@ public class AsyncSupport {
if (awaitables == null || awaitables.length == 0) {
throw new IllegalArgumentException("awaitAny requires at least one
awaitable");
}
- for (int i = 0; i < awaitables.length; i++) {
- if (awaitables[i] == null) {
- throw new IllegalArgumentException("awaitAny: element at index
" + i + " is null");
- }
- }
- CompletableFuture<?>[] futures =
-
Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCompletableFutures(awaitables,
"awaitAny");
try {
return CompletableFuture.anyOf(futures).join();
} catch (CompletionException e) {
@@ -521,13 +509,7 @@ public class AsyncSupport {
if (awaitables == null || awaitables.length == 0) {
return new ArrayList<>();
}
- for (int i = 0; i < awaitables.length; i++) {
- if (awaitables[i] == null) {
- throw new IllegalArgumentException("awaitAllSettled: element
at index " + i + " is null");
- }
- }
- CompletableFuture<?>[] futures =
-
Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCompletableFutures(awaitables,
"awaitAllSettled");
CompletableFuture.allOf(
Arrays.stream(futures)
.map(f -> f.handle((v, t) -> null))
@@ -545,6 +527,29 @@ public class AsyncSupport {
return Awaitable.from(source).toCompletableFuture();
}
+ /**
+ * Validates that no element is {@code null} and converts all elements to
+ * {@link CompletableFuture}s in one pass. Used by combinator methods
+ * ({@code awaitAll}, {@code awaitAny}, {@code awaitAllSettled}, and their
+ * non-blocking {@code Async} variants) to eliminate duplicate
+ * null-checking and conversion loops.
+ *
+ * @param sources the source objects to validate and convert
+ * @param callerName the method name for error messages
+ * @return an array of completable futures corresponding to the sources
+ * @throws IllegalArgumentException if any element is {@code null}
+ */
+ private static CompletableFuture<?>[] toCompletableFutures(Object[]
sources, String callerName) {
+ CompletableFuture<?>[] futures = new
CompletableFuture<?>[sources.length];
+ for (int i = 0; i < sources.length; i++) {
+ if (sources[i] == null) {
+ throw new IllegalArgumentException(callerName + ": element at
index " + i + " is null");
+ }
+ futures[i] = toCompletableFuture(sources[i]);
+ }
+ return futures;
+ }
+
// ---- non-blocking combinators (return Awaitable) --------------------
/**
@@ -556,13 +561,7 @@ public class AsyncSupport {
if (sources == null || sources.length == 0) {
return Awaitable.of(new ArrayList<>());
}
- CompletableFuture<?>[] futures = new CompletableFuture[sources.length];
- for (int i = 0; i < sources.length; i++) {
- if (sources[i] == null) {
- throw new IllegalArgumentException("Awaitable.all: element at
index " + i + " is null");
- }
- futures[i] = toCompletableFuture(sources[i]);
- }
+ CompletableFuture<?>[] futures = toCompletableFutures(sources,
"Awaitable.all");
CompletableFuture<List<Object>> combined =
CompletableFuture.allOf(futures)
.thenApply(v -> {
List<Object> results = new ArrayList<>(futures.length);
@@ -584,12 +583,7 @@ public class AsyncSupport {
if (sources == null || sources.length == 0) {
throw new IllegalArgumentException("Awaitable.any requires at
least one source");
}
- for (int i = 0; i < sources.length; i++) {
- if (sources[i] == null) {
- throw new IllegalArgumentException("Awaitable.any: element at
index " + i + " is null");
- }
- }
- CompletableFuture<?>[] futures =
Arrays.stream(sources).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCompletableFutures(sources,
"Awaitable.any");
return GroovyPromise.of((CompletableFuture<T>)
CompletableFuture.anyOf(futures));
}
@@ -602,12 +596,7 @@ public class AsyncSupport {
if (sources == null || sources.length == 0) {
return Awaitable.of(new ArrayList<>());
}
- for (int i = 0; i < sources.length; i++) {
- if (sources[i] == null) {
- throw new IllegalArgumentException("Awaitable.allSettled:
element at index " + i + " is null");
- }
- }
- CompletableFuture<?>[] futures =
Arrays.stream(sources).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCompletableFutures(sources,
"Awaitable.allSettled");
// Wait for all to settle (handle converts failures to non-exceptional
completions)
CompletableFuture<List<AwaitResult<Object>>> combined =
CompletableFuture.allOf(
Arrays.stream(futures)
diff --git
a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
index b027f91cfc..6c07699678 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
@@ -260,25 +260,7 @@ public final class AsyncTransformHelper {
* @return {@code true} if at least one {@code yieldReturn} call is found
*/
public static boolean containsYieldReturn(Statement code) {
- boolean[] found = {false};
- code.visit(new CodeVisitorSupport() {
- @Override
- public void
visitStaticMethodCallExpression(StaticMethodCallExpression call) {
- if (YIELD_RETURN_METHOD.equals(call.getMethod())
- &&
ASYNC_SUPPORT_CLASS.equals(call.getOwnerType().getName())) {
- found[0] = true;
- }
- if (!found[0]) {
- super.visitStaticMethodCallExpression(call);
- }
- }
-
- @Override
- public void visitClosureExpression(ClosureExpression expression) {
- // Do not descend into nested closures — each manages its own
generator
- }
- });
- return found[0];
+ return containsStaticCall(code, YIELD_RETURN_METHOD);
}
// ---- rewriting utilities --------------------------------------------
@@ -353,11 +335,25 @@ public final class AsyncTransformHelper {
* @return {@code true} if at least one {@code defer} call is found
*/
public static boolean containsDefer(Statement code) {
+ return containsStaticCall(code, DEFER_METHOD);
+ }
+
+ /**
+ * Scans the given statement tree for a static method call to
+ * {@code AsyncSupport.<methodName>()}. Does <em>not</em> descend
+ * into nested {@link ClosureExpression}s, since each nested closure
+ * manages its own transformation independently.
+ *
+ * @param code the statement tree to scan
+ * @param methodName the method name to look for on {@code AsyncSupport}
+ * @return {@code true} if at least one matching call is found
+ */
+ private static boolean containsStaticCall(Statement code, String
methodName) {
boolean[] found = {false};
code.visit(new CodeVisitorSupport() {
@Override
public void
visitStaticMethodCallExpression(StaticMethodCallExpression call) {
- if (DEFER_METHOD.equals(call.getMethod())
+ if (methodName.equals(call.getMethod())
&&
ASYNC_SUPPORT_CLASS.equals(call.getOwnerType().getName())) {
found[0] = true;
}
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index ef3a3fb9f9..3433c9d5eb 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -40,7 +40,7 @@ Key capabilities include:
* **`yield return`** — produce asynchronous streams (generators)
* **`defer`** — schedule Go-style cleanup actions that run when the method
completes
* **Framework integration** — built-in adapters for `CompletableFuture` and
`Future`;
-`Flow.Publisher` support via the auto-discovered `FlowPublisherAdapter` (an
internal runtime adapter);
+`Flow.Publisher` support via an auto-discovered runtime adapter;
extensible to RxJava, Reactor, and Spring via the adapter registry
On JDK 21+, async methods automatically leverage
@@ -115,7 +115,7 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=motivation_stream_processing,ind
----
The producer yields values on demand, and the consumer pulls them with
-`for await`. A `SynchronousQueue` sits between the two, providing
+`for await`. Internally, the runtime uses a handoff queue between the two,
providing
natural **back-pressure**: the producer blocks on each `yield return` until the
consumer is ready, preventing unbounded memory growth — all without requiring
the developer to manage queues, signals, or synchronization.
@@ -124,7 +124,7 @@ the developer to manage queues, signals, or synchronization.
Groovy does not force developers to abandon their existing async investments.
The `await` keyword natively understands `CompletableFuture`,
`CompletionStage`,
-`Future`, and `Flow.Publisher` (via the auto-discovered `FlowPublisherAdapter`
runtime adapter):
+`Future`, and `Flow.Publisher` (via an auto-discovered runtime adapter):
[source,groovy]
----
@@ -396,7 +396,7 @@ This is analogous to C#'s `yield return` within
`IAsyncEnumerable<T>` and JavaSc
generators.
The producer (generator) and consumer (`for await` loop) coordinate via a
-`SynchronousQueue`, providing natural back-pressure: the producer blocks on
each `yield return`
+handoff queue, providing natural back-pressure: the producer blocks on each
`yield return`
until the consumer is ready for the next value. This design means:
* The generator body runs on its own thread, executing sequentially like any
normal method
@@ -616,7 +616,7 @@ the reactive streams interface standardized in JDK 9. This
enables seamless cons
reactive streams from any library that implements the `Flow.Publisher`
contract (Project Reactor,
RxJava 3 via adapters, Vert.x, etc.).
-The internal adapter uses a bounded buffer (capacity 256) to bridge the
push-based `Publisher`
+The internal adapter uses a small bounded buffer to bridge the push-based
`Publisher`
model to Groovy's pull-based `AsyncStream` model, preventing unbounded memory
growth with
fast publishers while maintaining throughput. Back-pressure is enforced by
requesting exactly
one item at a time; demand for the next element is signalled _before_ the
consumer's
@@ -678,8 +678,8 @@ asynchronous types from third-party frameworks. The
built-in adapter handles:
* `java.util.concurrent.Future` (blocking, delegated to a configurable
executor)
Additionally, `java.util.concurrent.Flow.Publisher` support (single-value
`await` and
-multi-value `for await`) is provided by `FlowPublisherAdapter`
-(`org.apache.groovy.runtime.async`), which is auto-discovered via
`ServiceLoader`.
+multi-value `for await`) is provided by an internal runtime adapter
+that is auto-discovered via `ServiceLoader`.
This decoupled design allows applications to override the default
reactive-streams bridge with a framework-specific adapter (e.g., one backed by
Reactor's
native scheduler) by registering a higher-priority adapter.
@@ -899,7 +899,10 @@ and **closure/lambda expressions**. Applying `async` to
other declarations is a
[[implementation-details]]
== Implementation Notes
-This section provides architectural context for Groovy language maintainers
and advanced users.
+This section provides architectural context for Groovy language maintainers
and advanced
+users who want to understand how async/await works under the hood. The
descriptions
+focus on _design principles and behavioural guarantees_ rather than internal
class names,
+so they remain valid even as the runtime implementation evolves.
[[implementation-strategy]]
=== Thread-Based vs State Machine
@@ -935,45 +938,41 @@ for back-pressure, ensuring stable performance even
without virtual threads.
The async/await feature is implemented across three layers:
-**Grammar layer** (`GroovyLexer.g4`, `GroovyParser.g4`): Defines `async`,
`await`, `defer` as contextual
-keywords, along with parser rules for `for await`, `yield return`, and `defer`
statements.
-
-**AST transformation layer** (`AstBuilder`, `AsyncASTTransformation`,
`AsyncTransformHelper`):
-The `AstBuilder` converts parse trees into AST nodes, rewriting `async` method
bodies into calls
-to `AsyncSupport` static methods. The `@Async` annotation is processed by
`AsyncASTTransformation`,
-which delegates to the same `AsyncTransformHelper` utility class. This shared
helper ensures
-consistent code generation for both the keyword and annotation forms.
-
-**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`,
-`AsyncStreamGenerator`, `FlowPublisherAdapter`, `AwaitableAdapterRegistry`):
`AsyncSupport` is the
-central runtime class containing static methods for `await`, `async`, `defer`,
`yield return`,
-timeout composition, stream cleanup, and combinators. `GroovyPromise` wraps
-`CompletableFuture` to implement the `Awaitable` interface, decoupling the
public API from the
-JDK implementation. `AbstractAsyncStream` is the template base class for
queue-based `AsyncStream`
-implementations — it centralises the `moveNext()` signal dispatch,
`getCurrent()`/`close()`
-lifecycle, and interrupt handling, using the Template Method pattern with hook
methods
-(`beforeTake`, `afterValueConsumed`, `afterMoveNext`, `onMoveNextInterrupted`,
`onClose`).
-`AsyncStreamGenerator` extends the template for generator-style streams backed
by a
-`SynchronousQueue`. `FlowPublisherAdapter.FlowAsyncStream` extends the
template for
-`Flow.Publisher` adaptation using a bounded `LinkedBlockingQueue` with
one-at-a-time
-demand. `AwaitableAdapterRegistry` provides the SPI extension point for
third-party async type
-support.
+**Grammar layer**: Defines `async`, `await`, `defer` as contextual keywords,
along with
+parser rules for `for await`, `yield return`, and `defer` statements.
+
+**AST transformation layer**: Converts parse trees into AST nodes, rewriting
`async`
+method bodies into calls to runtime support methods. The `@Async` annotation
is processed
+by the same transformation pipeline, ensuring consistent code generation for
both the
+keyword and annotation forms.
+
+**Runtime layer**: Provides the static methods invoked by compiler-generated
code —
+`await`, `async`, `defer`, `yield return`, timeout composition, stream
cleanup, and
+combinators. Key design decisions include:
+
+* A dedicated promise wrapper decouples the public `Awaitable` API from
`CompletableFuture`,
+ so the implementation can evolve independently of the user-facing contract.
+* Queue-based `AsyncStream` implementations share a common template base class
(Template
+ Method pattern) that centralises signal dispatch, lifecycle management, and
interrupt
+ handling. Subclasses only override a small number of hook methods — e.g.
one subclass
+ powers async generators (`yield return`), another bridges `Flow.Publisher`
sources.
+* The adapter registry (`AwaitableAdapterRegistry`) provides an SPI extension
point for
+ third-party async types, with `ServiceLoader`-based auto-discovery and
runtime
+ registration.
==== API Decoupling
The public API that Groovy developers interact with is defined entirely in the
`groovy.concurrent` package (`Awaitable`, `AsyncStream`, `AwaitResult`,
-`AwaitableAdapter`, `AwaitableAdapterRegistry`). The implementation classes
-(`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`,
`AsyncStreamGenerator`,
-`FlowPublisherAdapter`) live in
-`org.apache.groovy.runtime.async` — an internal package.
+`AwaitableAdapter`, `AwaitableAdapterRegistry`). All implementation classes
live in an
+internal package that is not part of the public contract.
This separation means:
* User code depends on `Awaitable`, _not_ on `CompletableFuture`. If the JDK
async
infrastructure evolves (e.g., structured concurrency in future JDK
releases), only the
internal implementation needs to change — the public `Awaitable` contract
remains stable.
-* `GroovyPromise` is the sole bridge between `Awaitable` and
`CompletableFuture`.
+* The promise wrapper is the sole bridge between `Awaitable` and
`CompletableFuture`.
Replacing it (for example, with a structured-concurrency-based
implementation) would be
transparent to all application code.
* `toCompletableFuture()` is the one explicit escape hatch for interoperating
with Java
@@ -982,120 +981,76 @@ This separation means:
[[thread-safety]]
=== Thread Safety and Robustness
-A key design goal is that **thread safety is the framework's responsibility,
not the user's**.
-All concurrency control is encapsulated inside the runtime, so application
code never needs
-explicit locks, atomics, or volatile annotations.
+A key design goal is that **thread safety is the framework's responsibility,
not the
+user's**. All concurrency control is encapsulated inside the runtime, so
application code
+never needs explicit locks, atomics, or volatile annotations.
==== Lock-Free Synchronization
-All runtime components employ lock-free or minimal-contention synchronization:
-
-* `AsyncSupport.defaultExecutor` — `volatile` field for safe publication
without locks
-* `AwaitableAdapterRegistry.ADAPTERS` — `CopyOnWriteArrayList` for lock-free
iteration during adapter lookup;
-writes (adapter registration) are rare, reads (every `await`) are frequent
-* `AwaitableAdapterRegistry.blockingExecutor` — `volatile` field
-* `AbstractAsyncStream.current` — `volatile` field for cross-thread visibility
-* `AbstractAsyncStream.closed` — `AtomicBoolean` for lifecycle management
(shared by all queue-based streams)
-* `AsyncStreamGenerator` — extends `AbstractAsyncStream`; adds
`AtomicReference<Thread>` for prompt,
-idempotent close/cancellation signalling
-* `Flow.Publisher` adaptation (`FlowPublisherAdapter.FlowAsyncStream`) —
extends `AbstractAsyncStream`;
- the inherited `AtomicBoolean` closed flag governs the entire lifecycle (both
upstream terminal signals
- and consumer-side close), with `AtomicReference<Subscription>` for
CAS-guarded onSubscribe (§2.5
- compliance); all signals (`onNext`/`onError`/`onComplete`) use blocking
`put()` with a non-blocking
- `offer()` fallback on interrupt, preventing both silent item loss and
consumer deadlock; demand is
- signalled before `moveNext()` returns to prevent livelock; `moveNext()` uses
shared cached `Awaitable`
- instances
- (defined on the `AsyncStream` interface) to eliminate per-call allocations
on the hot path
-* Defer scopes — per-task `ArrayDeque`, confined to a single thread (no
sharing)
-* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
+All runtime components employ lock-free or minimal-contention synchronization.
Shared
+mutable state uses `volatile` fields for safe publication, `AtomicBoolean` for
lifecycle
+flags, `AtomicReference` for thread and subscription tracking, and
copy-on-write
+collections for read-heavy registries (such as the adapter list). No
component acquires a
+monitor lock during normal operation.
==== Async Generator Safety
-The `AsyncStreamGenerator` extends `AbstractAsyncStream` — a template base
class that
-centralises the `moveNext()` signal dispatch, lifecycle management, and
interrupt handling
-common to all queue-based `AsyncStream` implementations. The generator adds
the producer-side
-API (`yield`, `complete`, `error`) and overrides four template hooks
(`beforeTake`,
-`afterMoveNext`, `onMoveNextInterrupted`, `onClose`). Several concurrency
hazards are
-handled transparently:
+The async generator (powering `yield return` / `for await`) uses a handoff
queue between
+the producer and consumer threads, with several concurrency safeguards handled
+transparently by the runtime:
[cols="2,3"]
|===
-| Concern | Mechanism
+| Concern | Guarantee
| **Back-pressure**
-| A `SynchronousQueue` between producer and consumer ensures the producer
blocks on each
-`yield return` until the consumer calls `moveNext()`. No unbounded buffering
can occur.
+| The producer blocks on each `yield return` until the consumer calls
`moveNext()`.
+No unbounded buffering can occur.
| **Cooperative cancellation**
-| The producer thread is tracked via `AtomicReference<Thread>`. When the
consumer calls
-`close()` (explicitly or via compiler-generated `finally`), the producer
thread is interrupted,
-allowing it to clean up promptly even if blocked in I/O or a long computation.
-
-| **TOCTOU race prevention**
-| `moveNext()` uses a double-check pattern: the `closed` flag is re-checked
_after_
-registering the consumer thread. This closes a race window where `close()`
could execute
-between the initial check and the `consumerThread.set()` call, which would
leave the
-consumer stranded in `queue.take()` with no one to interrupt it.
-
-| **Thread-safe consumer tracking**
-| The consumer thread is tracked via `AtomicReference<Thread>` during
`moveNext()`. This
-enables `close()` to interrupt a blocked consumer. Note: concurrent
`moveNext()` calls
-from multiple threads are not supported and may produce unpredictable results
— async
-generators are inherently single-consumer (just like C#'s `IAsyncEnumerator`).
+| When the consumer calls `close()` (explicitly or via the compiler-generated
`finally`
+block), the producer thread is interrupted, allowing it to clean up promptly
even if
+blocked in I/O or a long computation.
+
+| **Race-free lifecycle**
+| The `closed` flag is checked _after_ registering the consumer thread in
`moveNext()`,
+closing a race window where `close()` could execute between the initial check
and the
+thread registration, which would leave the consumer blocked with no one to
interrupt it.
| **Idempotent close**
-| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to
call multiple
-times from any thread without side effects.
+| `close()` can be called multiple times from any thread without side effects.
| **Signal delivery under interrupt**
-| If the producer's `complete()` or `error()` signal is interrupted and the
non-blocking
-fallback delivery fails (no consumer waiting), the generator force-closes
itself. This
-prevents the consumer from blocking indefinitely on a subsequent `moveNext()`
— a defensive
-measure against unexpected thread interruption outside the normal close path.
+| If a terminal signal (`complete` or `error`) cannot be delivered because the
thread is
+interrupted and no consumer is waiting, the generator force-closes itself to
prevent the
+consumer from blocking indefinitely on a subsequent `moveNext()`.
|===
==== Flow.Publisher Adaptation Safety
-The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol
into the pull-based
-`AsyncStream` model via a named inner class `FlowAsyncStream` that also extends
-`AbstractAsyncStream`. It overrides three template hooks (`beforeTake`,
`afterValueConsumed`,
-`onClose`) and inherits the default `onMoveNextInterrupted` behaviour. A
single `AtomicBoolean`
-closed flag (inherited from the template) governs the entire lifecycle — set
by the first terminal
-signal, by the consumer's `close()`, or by an interrupt. Several concurrency
hazards are handled
-transparently:
+The `Flow.Publisher` bridge converts the push-based `Publisher` protocol into
the
+pull-based `AsyncStream` model, with the following guarantees:
[cols="2,3"]
|===
-| Concern | Mechanism
+| Concern | Guarantee
| **Back-pressure**
-| A bounded `LinkedBlockingQueue` (capacity 2) bridges push and pull. Demand
is capped at
-one item per `moveNext()` call via `request(1)`. The small capacity reflects
the one-at-a-time
-demand model: at most one value plus a racing terminal signal. Demand is
signalled _before_
-`moveNext()` returns, preventing livelock when producer and consumer share a
thread pool.
+| A small bounded buffer bridges push and pull. Demand is capped at one item
per
+`moveNext()` call. Demand is signalled _before_ `moveNext()` returns,
preventing
+livelock when producer and consumer share a thread pool.
| **Interrupt-safe signal delivery**
-| All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking
`put()` for normal
-delivery, with a non-blocking `offer()` fallback when the publisher thread is
interrupted.
-If `offer()` also fails (queue full from a misbehaving publisher), `onNext`
cancels the
-upstream subscription and injects an error signal; terminal signals
(`onError`/`onComplete`)
-share a common `putTerminalSignal()` helper that atomically CAS-closes the
stream, cancels
-the subscription, and delivers the signal.
+| Subscriber callbacks use blocking delivery with a non-blocking fallback when
the
+publisher thread is interrupted. If the fallback also fails, the upstream
subscription
+is cancelled and an error signal is injected to unblock the consumer.
| **Allocation-free hot path**
-| `moveNext()` returns shared cached `Awaitable<Boolean>` constants defined on
the `AsyncStream`
-interface for the value and end-of-stream cases, eliminating per-call
`CompletableFuture` +
-`GroovyPromise` allocation. Error signals are thrown directly (matching
`AsyncStreamGenerator`
-behaviour) rather than wrapped in a failed `CompletableFuture`.
-
-| **Non-blocking close**
-| `close()` uses non-blocking `offer()` (instead of blocking `put()`) to
inject the completion
-sentinel after clearing the queue — this cannot throw `InterruptedException`
and effectively
-always succeeds because the queue was just drained.
+| `moveNext()` returns shared cached `Awaitable<Boolean>` constants for the
value and
+end-of-stream cases, eliminating per-call object allocation on the hot path.
| **Idempotent close**
-| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to
call multiple
-times from any thread without side effects.
+| `close()` can be called multiple times from any thread without side effects.
|===
These mechanisms ensure that `yield return` / `for await` code remains as
simple as writing
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index fc76e4c017..972583e020 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -3213,4 +3213,86 @@ class AsyncApiTest {
assert gen.getCurrent() == 'after-null'
assert gen.moveNext().get() == false
}
+
+ // ---- Tests for toCompletableFutures null-validation helper ----
+
+ void testAwaitAllRejectsNullElement() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncSupport.awaitAll(Awaitable.of(1), null, Awaitable.of(3))
+ }
+ assert ex.message.contains('awaitAll')
+ assert ex.message.contains('index 1')
+ }
+
+ void testAwaitAnyRejectsNullElement() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncSupport.awaitAny(Awaitable.of(1), null)
+ }
+ assert ex.message.contains('awaitAny')
+ assert ex.message.contains('index 1')
+ }
+
+ void testAwaitAllSettledRejectsNullElement() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncSupport.awaitAllSettled(null, Awaitable.of(2))
+ }
+ assert ex.message.contains('awaitAllSettled')
+ assert ex.message.contains('index 0')
+ }
+
+ void testAllAsyncRejectsNullElement() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncSupport.allAsync(Awaitable.of('a'), null)
+ }
+ assert ex.message.contains('Awaitable.all')
+ assert ex.message.contains('index 1')
+ }
+
+ void testAnyAsyncRejectsNullElement() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncSupport.anyAsync(null, Awaitable.of('b'))
+ }
+ assert ex.message.contains('Awaitable.any')
+ assert ex.message.contains('index 0')
+ }
+
+ void testAllSettledAsyncRejectsNullElement() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncSupport.allSettledAsync(Awaitable.of(1), Awaitable.of(2),
null)
+ }
+ assert ex.message.contains('Awaitable.allSettled')
+ assert ex.message.contains('index 2')
+ }
+
+ void testAwaitAllAcceptsMixedTypes() {
+ // Verify toCompletableFutures correctly converts mixed types
+ def cf = CompletableFuture.completedFuture(10)
+ def awaitable = Awaitable.of(20)
+ def results = AsyncSupport.awaitAll(cf, awaitable)
+ assert results == [10, 20]
+ }
+
+ void testAwaitAllSettledAcceptsMixedTypes() {
+ def cf = CompletableFuture.completedFuture('ok')
+ def failedCf = new CompletableFuture()
+ failedCf.completeExceptionally(new RuntimeException('boom'))
+ def results = AsyncSupport.awaitAllSettled(cf, failedCf)
+ assert results.size() == 2
+ assert results[0].isSuccess()
+ assert results[0].value == 'ok'
+ assert results[1].isFailure()
+ assert results[1].error.message == 'boom'
+ }
+
+ // ---- Tests for AwaitableAdapterRegistry.completeFrom simplified
exception handling ----
+
+ void testAdapterRegistryHandlesCancelledFuture() {
+ def future = new CompletableFuture()
+ future.cancel(true)
+ def awaitable = Awaitable.from(future)
+ def ex = shouldFail(CancellationException) {
+ awaitable.get()
+ }
+ assert ex instanceof CancellationException
+ }
}