This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new b27543013a Minor tweaks
b27543013a is described below

commit b27543013aba21d64a69d9c3545c0c3c98d843dd
Author: Daniel Sun <[email protected]>
AuthorDate: Tue Mar 3 00:08:40 2026 +0900

    Minor tweaks
---
 .../concurrent/AwaitableAdapterRegistry.java       | 60 +++++++++++++---------
 .../apache/groovy/parser/antlr4/AstBuilder.java    | 13 ++++-
 src/spec/doc/core-async-await.adoc                 | 27 ++++++++++
 .../groovy/transform/AsyncAwaitSyntaxTest.groovy   |  3 +-
 .../groovy/transform/AsyncPatternsTest.groovy      |  2 +-
 .../groovy/transform/AsyncVirtualThreadTest.groovy |  3 +-
 6 files changed, 81 insertions(+), 27 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java 
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 8ee415dd27..e93c309a1b 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -23,11 +23,16 @@ import org.apache.groovy.runtime.async.GroovyPromise;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Central registry for {@link AwaitableAdapter} instances.
@@ -38,7 +43,7 @@ import java.util.concurrent.Future;
  * <ul>
  *   <li>{@link CompletableFuture} and {@link CompletionStage}</li>
  *   <li>{@link Future} (adapted via a blocking wrapper)</li>
- *   <li>JDK {@link java.util.concurrent.Flow.Publisher} — single-value
+ *   <li>JDK {@link Flow.Publisher} — single-value
  *       ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
  *       with backpressure support</li>
  * </ul>
@@ -145,14 +150,14 @@ public class AwaitableAdapterRegistry {
 
     /**
      * Built-in adapter handling JDK {@link CompletableFuture}, {@link 
CompletionStage},
-     * {@link Future}, {@link java.util.concurrent.Flow.Publisher},
+     * {@link Future}, {@link Flow.Publisher},
      * and {@link Iterable}/{@link Iterator} (for async stream bridging).
      * <p>
      * {@link CompletionStage} support enables seamless integration with 
frameworks
      * that return {@code CompletionStage} (e.g., Spring's async APIs, 
Reactor's
      * {@code Mono.toFuture()}, etc.) without any additional adapter 
registration.
      * <p>
-     * {@link java.util.concurrent.Flow.Publisher} support enables seamless
+     * {@link Flow.Publisher} support enables seamless
      * consumption of reactive streams via {@code for await} without any 
adapter
      * registration.  This covers any reactive library that implements the JDK
      * standard reactive-streams interface (Reactor, RxJava via adapters, 
etc.).
@@ -163,7 +168,7 @@ public class AwaitableAdapterRegistry {
         public boolean supportsAwaitable(Class<?> type) {
             return CompletionStage.class.isAssignableFrom(type)
                     || Future.class.isAssignableFrom(type)
-                    || 
java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type);
+                    || Flow.Publisher.class.isAssignableFrom(type);
         }
 
         @Override
@@ -172,7 +177,7 @@ public class AwaitableAdapterRegistry {
             if (source instanceof CompletionStage) {
                 return new GroovyPromise<>(((CompletionStage<T>) 
source).toCompletableFuture());
             }
-            if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) {
+            if (source instanceof Flow.Publisher<?> pub) {
                 return publisherToAwaitable(pub);
             }
             if (source instanceof Future) {
@@ -197,14 +202,14 @@ public class AwaitableAdapterRegistry {
         public boolean supportsAsyncStream(Class<?> type) {
             return Iterable.class.isAssignableFrom(type)
                     || Iterator.class.isAssignableFrom(type)
-                    || 
java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type);
+                    || Flow.Publisher.class.isAssignableFrom(type);
         }
 
         @Override
         @SuppressWarnings("unchecked")
         public <T> AsyncStream<T> toAsyncStream(Object source) {
-            if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) {
-                return 
publisherToAsyncStream((java.util.concurrent.Flow.Publisher<T>) pub);
+            if (source instanceof Flow.Publisher<?> pub) {
+                return publisherToAsyncStream((Flow.Publisher<T>) pub);
             }
             final Iterator<T> iterator;
             if (source instanceof Iterable) {
@@ -232,23 +237,28 @@ public class AwaitableAdapterRegistry {
         }
 
         /**
-         * Adapts a {@link java.util.concurrent.Flow.Publisher} to an
+         * Adapts a {@link Flow.Publisher} to an
          * {@link AsyncStream} using a blocking queue to bridge the push-based
          * reactive-streams protocol to the pull-based {@code moveNext}/{@code 
getCurrent}
          * pattern.  Backpressure is managed by requesting one item at a time:
          * each {@code moveNext()} call requests the next item from the 
upstream
          * subscription only after the previous item has been consumed.
+         * <p>
+         * {@link Throwable} instances are delivered as exceptional 
completions.
+         * Thread interruption during {@code queue.take()} is converted to a
+         * {@link CancellationException} with the interrupt
+         * flag preserved, consistent with the cancellation semantics used 
elsewhere.
          */
         @SuppressWarnings("unchecked")
-        private static <T> AsyncStream<T> 
publisherToAsyncStream(java.util.concurrent.Flow.Publisher<T> publisher) {
-            java.util.concurrent.LinkedBlockingQueue<Object> queue = new 
java.util.concurrent.LinkedBlockingQueue<>();
+        private static <T> AsyncStream<T> 
publisherToAsyncStream(Flow.Publisher<T> publisher) {
+            BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
             Object COMPLETE_SENTINEL = new Object();
-            
java.util.concurrent.atomic.AtomicReference<java.util.concurrent.Flow.Subscription>
 subRef =
-                    new java.util.concurrent.atomic.AtomicReference<>();
+            AtomicReference<Flow.Subscription> subRef =
+                    new AtomicReference<>();
 
-            publisher.subscribe(new java.util.concurrent.Flow.Subscriber<T>() {
+            publisher.subscribe(new Flow.Subscriber<T>() {
                 @Override
-                public void onSubscribe(java.util.concurrent.Flow.Subscription 
s) {
+                public void onSubscribe(Flow.Subscription s) {
                     subRef.set(s);
                     s.request(1);
                 }
@@ -260,7 +270,9 @@ public class AwaitableAdapterRegistry {
 
                 @Override
                 public void onError(Throwable t) {
-                    queue.add(t instanceof Exception ? t : new 
RuntimeException(t));
+                    // Preserve the original Throwable type (including Error) 
so that
+                    // consumers see the exact exception/error from the 
publisher.
+                    queue.add(t);
                 }
 
                 @Override
@@ -285,12 +297,14 @@ public class AwaitableAdapterRegistry {
                             current = (T) item;
                             cf.complete(true);
                             // Request next item for the subsequent moveNext() 
call
-                            java.util.concurrent.Flow.Subscription sub = 
subRef.get();
+                            Flow.Subscription sub = subRef.get();
                             if (sub != null) sub.request(1);
                         }
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
-                        cf.completeExceptionally(e);
+                        CancellationException ce = new 
CancellationException("Interrupted while waiting for next item");
+                        ce.initCause(e);
+                        cf.completeExceptionally(ce);
                     }
                     return new GroovyPromise<>(cf);
                 }
@@ -303,17 +317,17 @@ public class AwaitableAdapterRegistry {
         }
 
         /**
-         * Adapts a single-value {@link java.util.concurrent.Flow.Publisher} to
+         * Adapts a single-value {@link Flow.Publisher} to
          * an {@link Awaitable}.  Subscribes and takes the first emitted value.
          */
         @SuppressWarnings("unchecked")
-        private static <T> Awaitable<T> 
publisherToAwaitable(java.util.concurrent.Flow.Publisher<?> publisher) {
+        private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<?> 
publisher) {
             CompletableFuture<T> cf = new CompletableFuture<>();
-            publisher.subscribe(new 
java.util.concurrent.Flow.Subscriber<Object>() {
-                private java.util.concurrent.Flow.Subscription subscription;
+            publisher.subscribe(new Flow.Subscriber<Object>() {
+                private Flow.Subscription subscription;
 
                 @Override
-                public void onSubscribe(java.util.concurrent.Flow.Subscription 
s) {
+                public void onSubscribe(Flow.Subscription s) {
                     this.subscription = s;
                     s.request(1);
                 }
diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java 
b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
index 2e4ece72be..ecbe7e16d4 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
@@ -485,6 +485,11 @@ public class AstBuilder extends 
GroovyParserBaseVisitor<Object> {
      * over an {@link groovy.concurrent.AsyncStream}: the source expression is
      * adapted via {@code AsyncSupport.toAsyncStream()}, then repeatedly polled
      * with {@code moveNext()} / {@code getCurrent()}.
+     * <p>
+     * Variable modifiers (e.g. {@code final}) from the enhanced-for 
declaration
+     * are applied to the synthesised loop variable, consistent with the
+     * standard {@code for (... in ...)} handling in
+     * {@link #visitEnhancedForControl}.
      */
     private Statement visitForAwait(final ForStmtAltContext ctx) {
         ForControlContext forCtrl = ctx.forControl();
@@ -498,6 +503,12 @@ public class AstBuilder extends 
GroovyParserBaseVisitor<Object> {
         Expression source = (Expression) this.visit(enhCtrl.expression());
         Statement loopBody = this.unpackStatement((Statement) 
this.visit(ctx.statement()));
 
+        // Apply variable modifiers (e.g. final) to the loop variable
+        VariableExpression loopVar = varX(varName, varType);
+        ModifierManager modifierManager = new ModifierManager(this,
+                
this.visitVariableModifiersOpt(enhCtrl.variableModifiersOpt()));
+        modifierManager.processVariableExpression(loopVar);
+
         String streamVar = "$__asyncStream__" + (asyncStreamCounter++);
 
         // def $__asyncStream__N = AsyncSupport.toAsyncStream(source)
@@ -513,7 +524,7 @@ public class AstBuilder extends 
GroovyParserBaseVisitor<Object> {
 
         // def <varName> = $__asyncStream__N.getCurrent()
         Expression getCurrentCall = callX(varX(streamVar), "getCurrent");
-        ExpressionStatement getItemStmt = new 
ExpressionStatement(declX(varX(varName, varType), getCurrentCall));
+        ExpressionStatement getItemStmt = new 
ExpressionStatement(declX(loopVar, getCurrentCall));
 
         BlockStatement whileBody = block(getItemStmt, loopBody);
         WhileStatement whileStmt = new WhileStatement(condition, whileBody);
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index 1fb316eccd..a45aa2735b 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -503,6 +503,33 @@ def result = AsyncUtils.awaitAny(
 )
 ----
 
+[[cancellation]]
+=== Cancellation
+
+An `Awaitable` can be cancelled via the `cancel()` method. Once cancelled, any 
subsequent
+`await` on the task throws a `java.util.concurrent.CancellationException`:
+
+[source,groovy]
+----
+import java.util.concurrent.CancellationException
+
+def task = longRunningTask()
+task.cancel()
+
+try {
+    await(task)
+} catch (CancellationException e) {
+    // task was cancelled
+}
+----
+
+[NOTE]
+====
+Cancellation sets the `Awaitable` state to cancelled but does _not_ reliably 
interrupt the
+underlying thread (a limitation of `CompletableFuture`). For cooperative 
cancellation in
+long-running async bodies, check `Thread.currentThread().isInterrupted()` 
periodically.
+====
+
 [[summary]]
 == Summary
 
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
index b31b1d0263..9bc87564dd 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
@@ -1177,7 +1177,8 @@ class AsyncAwaitSyntaxTest {
                 a.get(50, TimeUnit.MILLISECONDS)
                 assert false : "expected timeout"
             } catch (TimeoutException e) {
-                // expected
+                // expected — cancel to avoid leaving the task running
+                a.cancel()
             }
         '''
     }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
index ed52945a65..8c765aa266 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
@@ -702,7 +702,7 @@ class AsyncPatternsTest {
             import groovy.concurrent.AsyncUtils
 
             def longRunning = async {
-                await(AsyncUtils.delay(5000))
+                await(AsyncUtils.delay(2000))
                 return "completed"
             }
             def longTask = longRunning()
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
index 76671d4dc4..7210c5744d 100644
--- 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
+++ 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy
@@ -960,7 +960,8 @@ final class AsyncVirtualThreadTest {
                 awaitable.get(50, TimeUnit.MILLISECONDS)
                 assert false : "Should have timed out"
             } catch (TimeoutException e) {
-                // expected
+                // expected — cancel to avoid leaving the task running
+                awaitable.cancel()
             }
         '''
     }

Reply via email to