This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new b27543013a Minor tweaks
b27543013a is described below
commit b27543013aba21d64a69d9c3545c0c3c98d843dd
Author: Daniel Sun <[email protected]>
AuthorDate: Tue Mar 3 00:08:40 2026 +0900
Minor tweaks
---
.../concurrent/AwaitableAdapterRegistry.java | 60 +++++++++++++---------
.../apache/groovy/parser/antlr4/AstBuilder.java | 13 ++++-
src/spec/doc/core-async-await.adoc | 27 ++++++++++
.../groovy/transform/AsyncAwaitSyntaxTest.groovy | 3 +-
.../groovy/transform/AsyncPatternsTest.groovy | 2 +-
.../groovy/transform/AsyncVirtualThreadTest.groovy | 3 +-
6 files changed, 81 insertions(+), 27 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 8ee415dd27..e93c309a1b 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -23,11 +23,16 @@ import org.apache.groovy.runtime.async.GroovyPromise;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Central registry for {@link AwaitableAdapter} instances.
@@ -38,7 +43,7 @@ import java.util.concurrent.Future;
* <ul>
* <li>{@link CompletableFuture} and {@link CompletionStage}</li>
* <li>{@link Future} (adapted via a blocking wrapper)</li>
- * <li>JDK {@link java.util.concurrent.Flow.Publisher} — single-value
+ * <li>JDK {@link Flow.Publisher} — single-value
* ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
* with backpressure support</li>
* </ul>
@@ -145,14 +150,14 @@ public class AwaitableAdapterRegistry {
/**
* Built-in adapter handling JDK {@link CompletableFuture}, {@link
CompletionStage},
- * {@link Future}, {@link java.util.concurrent.Flow.Publisher},
+ * {@link Future}, {@link Flow.Publisher},
* and {@link Iterable}/{@link Iterator} (for async stream bridging).
* <p>
* {@link CompletionStage} support enables seamless integration with
frameworks
* that return {@code CompletionStage} (e.g., Spring's async APIs,
Reactor's
* {@code Mono.toFuture()}, etc.) without any additional adapter
registration.
* <p>
- * {@link java.util.concurrent.Flow.Publisher} support enables seamless
+ * {@link Flow.Publisher} support enables seamless
* consumption of reactive streams via {@code for await} without any
adapter
* registration. This covers any reactive library that implements the JDK
* standard reactive-streams interface (Reactor, RxJava via adapters,
etc.).
@@ -163,7 +168,7 @@ public class AwaitableAdapterRegistry {
public boolean supportsAwaitable(Class<?> type) {
return CompletionStage.class.isAssignableFrom(type)
|| Future.class.isAssignableFrom(type)
- ||
java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type);
+ || Flow.Publisher.class.isAssignableFrom(type);
}
@Override
@@ -172,7 +177,7 @@ public class AwaitableAdapterRegistry {
if (source instanceof CompletionStage) {
return new GroovyPromise<>(((CompletionStage<T>)
source).toCompletableFuture());
}
- if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) {
+ if (source instanceof Flow.Publisher<?> pub) {
return publisherToAwaitable(pub);
}
if (source instanceof Future) {
@@ -197,14 +202,14 @@ public class AwaitableAdapterRegistry {
public boolean supportsAsyncStream(Class<?> type) {
return Iterable.class.isAssignableFrom(type)
|| Iterator.class.isAssignableFrom(type)
- ||
java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type);
+ || Flow.Publisher.class.isAssignableFrom(type);
}
@Override
@SuppressWarnings("unchecked")
public <T> AsyncStream<T> toAsyncStream(Object source) {
- if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) {
- return
publisherToAsyncStream((java.util.concurrent.Flow.Publisher<T>) pub);
+ if (source instanceof Flow.Publisher<?> pub) {
+ return publisherToAsyncStream((Flow.Publisher<T>) pub);
}
final Iterator<T> iterator;
if (source instanceof Iterable) {
@@ -232,23 +237,28 @@ public class AwaitableAdapterRegistry {
}
/**
- * Adapts a {@link java.util.concurrent.Flow.Publisher} to an
+ * Adapts a {@link Flow.Publisher} to an
* {@link AsyncStream} using a blocking queue to bridge the push-based
* reactive-streams protocol to the pull-based {@code moveNext}/{@code
getCurrent}
* pattern. Backpressure is managed by requesting one item at a time:
* each {@code moveNext()} call requests the next item from the
upstream
* subscription only after the previous item has been consumed.
+ * <p>
+ * {@link Throwable} instances are delivered as exceptional
completions.
+ * Thread interruption during {@code queue.take()} is converted to a
+ * {@link CancellationException} with the interrupt
+ * flag preserved, consistent with the cancellation semantics used
elsewhere.
*/
@SuppressWarnings("unchecked")
- private static <T> AsyncStream<T>
publisherToAsyncStream(java.util.concurrent.Flow.Publisher<T> publisher) {
- java.util.concurrent.LinkedBlockingQueue<Object> queue = new
java.util.concurrent.LinkedBlockingQueue<>();
+ private static <T> AsyncStream<T>
publisherToAsyncStream(Flow.Publisher<T> publisher) {
+ BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
Object COMPLETE_SENTINEL = new Object();
-
java.util.concurrent.atomic.AtomicReference<java.util.concurrent.Flow.Subscription>
subRef =
- new java.util.concurrent.atomic.AtomicReference<>();
+ AtomicReference<Flow.Subscription> subRef =
+ new AtomicReference<>();
- publisher.subscribe(new java.util.concurrent.Flow.Subscriber<T>() {
+ publisher.subscribe(new Flow.Subscriber<T>() {
@Override
- public void onSubscribe(java.util.concurrent.Flow.Subscription
s) {
+ public void onSubscribe(Flow.Subscription s) {
subRef.set(s);
s.request(1);
}
@@ -260,7 +270,9 @@ public class AwaitableAdapterRegistry {
@Override
public void onError(Throwable t) {
- queue.add(t instanceof Exception ? t : new
RuntimeException(t));
+ // Preserve the original Throwable type (including Error)
so that
+ // consumers see the exact exception/error from the
publisher.
+ queue.add(t);
}
@Override
@@ -285,12 +297,14 @@ public class AwaitableAdapterRegistry {
current = (T) item;
cf.complete(true);
// Request next item for the subsequent moveNext()
call
- java.util.concurrent.Flow.Subscription sub =
subRef.get();
+ Flow.Subscription sub = subRef.get();
if (sub != null) sub.request(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- cf.completeExceptionally(e);
+ CancellationException ce = new
CancellationException("Interrupted while waiting for next item");
+ ce.initCause(e);
+ cf.completeExceptionally(ce);
}
return new GroovyPromise<>(cf);
}
@@ -303,17 +317,17 @@ public class AwaitableAdapterRegistry {
}
/**
- * Adapts a single-value {@link java.util.concurrent.Flow.Publisher} to
+ * Adapts a single-value {@link Flow.Publisher} to
* an {@link Awaitable}. Subscribes and takes the first emitted value.
*/
@SuppressWarnings("unchecked")
- private static <T> Awaitable<T>
publisherToAwaitable(java.util.concurrent.Flow.Publisher<?> publisher) {
+ private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<?>
publisher) {
CompletableFuture<T> cf = new CompletableFuture<>();
- publisher.subscribe(new
java.util.concurrent.Flow.Subscriber<Object>() {
- private java.util.concurrent.Flow.Subscription subscription;
+ publisher.subscribe(new Flow.Subscriber<Object>() {
+ private Flow.Subscription subscription;
@Override
- public void onSubscribe(java.util.concurrent.Flow.Subscription
s) {
+ public void onSubscribe(Flow.Subscription s) {
this.subscription = s;
s.request(1);
}
diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
index 2e4ece72be..ecbe7e16d4 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
@@ -485,6 +485,11 @@ public class AstBuilder extends
GroovyParserBaseVisitor<Object> {
* over an {@link groovy.concurrent.AsyncStream}: the source expression is
* adapted via {@code AsyncSupport.toAsyncStream()}, then repeatedly polled
* with {@code moveNext()} / {@code getCurrent()}.
+ * <p>
+ * Variable modifiers (e.g. {@code final}) from the enhanced-for
declaration
+ * are applied to the synthesised loop variable, consistent with the
+ * standard {@code for (... in ...)} handling in
+ * {@link #visitEnhancedForControl}.
*/
private Statement visitForAwait(final ForStmtAltContext ctx) {
ForControlContext forCtrl = ctx.forControl();
@@ -498,6 +503,12 @@ public class AstBuilder extends
GroovyParserBaseVisitor<Object> {
Expression source = (Expression) this.visit(enhCtrl.expression());
Statement loopBody = this.unpackStatement((Statement)
this.visit(ctx.statement()));
+ // Apply variable modifiers (e.g. final) to the loop variable
+ VariableExpression loopVar = varX(varName, varType);
+ ModifierManager modifierManager = new ModifierManager(this,
+
this.visitVariableModifiersOpt(enhCtrl.variableModifiersOpt()));
+ modifierManager.processVariableExpression(loopVar);
+
String streamVar = "$__asyncStream__" + (asyncStreamCounter++);
// def $__asyncStream__N = AsyncSupport.toAsyncStream(source)
@@ -513,7 +524,7 @@ public class AstBuilder extends
GroovyParserBaseVisitor<Object> {
// def <varName> = $__asyncStream__N.getCurrent()
Expression getCurrentCall = callX(varX(streamVar), "getCurrent");
- ExpressionStatement getItemStmt = new
ExpressionStatement(declX(varX(varName, varType), getCurrentCall));
+ ExpressionStatement getItemStmt = new
ExpressionStatement(declX(loopVar, getCurrentCall));
BlockStatement whileBody = block(getItemStmt, loopBody);
WhileStatement whileStmt = new WhileStatement(condition, whileBody);
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index 1fb316eccd..a45aa2735b 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -503,6 +503,33 @@ def result = AsyncUtils.awaitAny(
)
----
+[[cancellation]]
+=== Cancellation
+
+An `Awaitable` can be cancelled via the `cancel()` method. Once cancelled, any
subsequent
+`await` on the task throws a `java.util.concurrent.CancellationException`:
+
+[source,groovy]
+----
+import java.util.concurrent.CancellationException
+
+def task = longRunningTask()
+task.cancel()
+
+try {
+ await(task)
+} catch (CancellationException e) {
+ // task was cancelled
+}
+----
+
+[NOTE]
+====
+Cancellation sets the `Awaitable` state to cancelled but does _not_ reliably
interrupt the
+underlying thread (a limitation of `CompletableFuture`). For cooperative
cancellation in
+long-running async bodies, check `Thread.currentThread().isInterrupted()`
periodically.
+====
+
[[summary]]
== Summary
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
index b31b1d0263..9bc87564dd 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
@@ -1177,7 +1177,8 @@ class AsyncAwaitSyntaxTest {
a.get(50, TimeUnit.MILLISECONDS)
assert false : "expected timeout"
} catch (TimeoutException e) {
- // expected
+ // expected — cancel to avoid leaving the task running
+ a.cancel()
}
'''
}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
index ed52945a65..8c765aa266 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
@@ -702,7 +702,7 @@ class AsyncPatternsTest {
import groovy.concurrent.AsyncUtils
def longRunning = async {
- await(AsyncUtils.delay(5000))
+ await(AsyncUtils.delay(2000))
return "completed"
}
def longTask = longRunning()
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
index 76671d4dc4..7210c5744d 100644
---
a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
+++
b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
@@ -960,7 +960,8 @@ final class AsyncVirtualThreadTest {
awaitable.get(50, TimeUnit.MILLISECONDS)
assert false : "Should have timed out"
} catch (TimeoutException e) {
- // expected
+ // expected — cancel to avoid leaving the task running
+ awaitable.cancel()
}
'''
}