This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 951058c246 Minor refactor: extract duplicated code
951058c246 is described below

commit 951058c246b39db6ba50ca12360b5b079e5bc7ef
Author: Daniel Sun <[email protected]>
AuthorDate: Sat May 30 17:39:36 2026 +0900

    Minor refactor: extract duplicated code
---
 .../groovy/runtime/async/AsyncClosureUtils.java    | 31 +++-----
 .../apache/groovy/runtime/async/AsyncSupport.java  | 91 +++++++++++++---------
 src/test/groovy/groovy/AsyncAwaitTest.groovy       | 36 +++++++++
 3 files changed, 101 insertions(+), 57 deletions(-)

diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
index 1d6478a690..62e0b29f50 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
@@ -22,7 +22,6 @@ import groovy.concurrent.Awaitable;
 import groovy.lang.Closure;
 
 import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Closure-specific async utilities that have no pure-Java equivalent.
@@ -58,13 +57,7 @@ public final class AsyncClosureUtils {
         return new Closure<Awaitable<T>>(closure.getOwner(), 
closure.getThisObject()) {
             @SuppressWarnings("unused")
             public Awaitable<T> doCall(Object... args) {
-                return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
-                    try {
-                        return closure.call(args);
-                    } catch (Throwable t) {
-                        throw AsyncSupport.wrapForFuture(t);
-                    }
-                }, AsyncSupport.getExecutor()));
+                return AsyncSupport.async(() -> closure.call(args));
             }
         };
     }
@@ -84,21 +77,15 @@ public final class AsyncClosureUtils {
         return new Closure<Iterable<T>>(closure.getOwner(), 
closure.getThisObject()) {
             @SuppressWarnings("unused")
             public Iterable<T> doCall(Object... args) {
-                GeneratorBridge<T> bridge = new GeneratorBridge<>();
-                Object[] allArgs = new Object[args.length + 1];
-                allArgs[0] = bridge;
-                System.arraycopy(args, 0, allArgs, 1, args.length);
-                AsyncSupport.getExecutor().execute(() -> {
-                    try {
-                        closure.call(allArgs);
-                        bridge.complete();
-                    } catch (GeneratorBridge.GeneratorClosedException ignored) 
{
-                    } catch (Throwable t) {
-                        bridge.completeExceptionally(t);
-                    }
-                });
-                return () -> bridge;
+                return AsyncSupport.asyncGenerator(bridge -> 
closure.call(prependArgument(bridge, args)));
             }
         };
     }
+
+    private static Object[] prependArgument(Object argument, Object[] args) {
+        Object[] allArgs = new Object[args.length + 1];
+        allArgs[0] = argument;
+        System.arraycopy(args, 0, allArgs, 1, args.length);
+        return allArgs;
+    }
 }
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 01878212d8..76c94abb2f 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -20,16 +20,23 @@ package org.apache.groovy.runtime.async;
 
 import groovy.concurrent.AwaitResult;
 import groovy.concurrent.Awaitable;
+import groovy.concurrent.AwaitableAdapterRegistry;
 
+import java.io.Closeable;
 import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -45,6 +52,10 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 /**
  * Internal runtime support for the {@code async}/{@code await}/{@code defer} 
language features.
@@ -111,8 +122,7 @@ public class AsyncSupport {
         }
     }
 
-    private static volatile Executor defaultExecutor =
-            VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : 
FALLBACK_EXECUTOR;
+    private static volatile Executor defaultExecutor = createDefaultExecutor();
 
     private static final ScheduledExecutorService SCHEDULER =
             Executors.newSingleThreadScheduledExecutor(r -> {
@@ -147,7 +157,7 @@ public class AsyncSupport {
 
     /** Resets the executor to the default (virtual threads on JDK 21+, cached 
pool otherwise). */
     public static void resetExecutor() {
-        defaultExecutor = VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR 
: FALLBACK_EXECUTOR;
+        defaultExecutor = createDefaultExecutor();
     }
 
     // ---- await overloads ------------------------------------------------
@@ -163,10 +173,7 @@ public class AsyncSupport {
         } catch (ExecutionException e) {
             throw rethrowUnwrapped(e);
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            CancellationException ce = new CancellationException("Interrupted 
while awaiting");
-            ce.initCause(e);
-            throw ce;
+            throw interruptedAwait("Interrupted while awaiting", e);
         }
     }
 
@@ -198,10 +205,7 @@ public class AsyncSupport {
         } catch (CancellationException e) {
             throw e;
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            CancellationException ce = new CancellationException("Interrupted 
while awaiting future");
-            ce.initCause(e);
-            throw ce;
+            throw interruptedAwait("Interrupted while awaiting future", e);
         }
     }
 
@@ -225,8 +229,8 @@ public class AsyncSupport {
      * Executes the given supplier asynchronously on the specified executor,
      * returning an {@link Awaitable}.
      */
-    public static <T> Awaitable<T> executeAsync(java.util.function.Supplier<T> 
supplier, Executor executor) {
-        java.util.Objects.requireNonNull(supplier, "supplier must not be 
null");
+    public static <T> Awaitable<T> executeAsync(Supplier<T> supplier, Executor 
executor) {
+        Objects.requireNonNull(supplier, "supplier must not be null");
         Executor targetExecutor = executor != null ? executor : 
defaultExecutor;
         return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
             try {
@@ -240,15 +244,15 @@ public class AsyncSupport {
     /**
      * Executes the given supplier asynchronously using the default executor.
      */
-    public static <T> Awaitable<T> async(java.util.function.Supplier<T> 
supplier) {
+    public static <T> Awaitable<T> async(Supplier<T> supplier) {
         return executeAsync(supplier, defaultExecutor);
     }
 
     /**
      * Lightweight task spawn. Executes the supplier asynchronously using the 
default executor.
      */
-    public static <T> Awaitable<T> go(java.util.function.Supplier<T> supplier) 
{
-        return executeAsync(supplier, defaultExecutor);
+    public static <T> Awaitable<T> go(Supplier<T> supplier) {
+        return async(supplier);
     }
 
     // ---- defer ----------------------------------------------------------
@@ -258,16 +262,16 @@ public class AsyncSupport {
      * Called by compiler-generated code at the start of closures
      * containing {@code defer} statements.
      */
-    public static java.util.Deque<java.util.concurrent.Callable<?>> 
createDeferScope() {
-        return new java.util.ArrayDeque<>();
+    public static Deque<Callable<?>> createDeferScope() {
+        return new ArrayDeque<>();
     }
 
     /**
      * Registers a deferred action in the given scope. Actions execute in LIFO
      * order when {@link #executeDeferScope} is called (in the finally block).
      */
-    public static void defer(java.util.Deque<java.util.concurrent.Callable<?>> 
scope,
-                             java.util.concurrent.Callable<?> action) {
+    public static void defer(Deque<Callable<?>> scope,
+                             Callable<?> action) {
         if (scope == null) {
             throw new IllegalStateException("defer must be used inside an 
async closure");
         }
@@ -283,7 +287,7 @@ public class AsyncSupport {
      * a Future/Awaitable, the result is awaited before continuing.
      */
     @SuppressWarnings("unchecked")
-    public static void 
executeDeferScope(java.util.Deque<java.util.concurrent.Callable<?>> scope) {
+    public static void executeDeferScope(Deque<Callable<?>> scope) {
         if (scope == null || scope.isEmpty()) return;
         Throwable firstError = null;
         while (!scope.isEmpty()) {
@@ -346,8 +350,8 @@ public class AsyncSupport {
      * @return an Iterable that yields values from the generator
      */
     @SuppressWarnings("unchecked")
-    public static <T> Iterable<T> 
asyncGenerator(java.util.function.Consumer<Object> body) {
-        java.util.Objects.requireNonNull(body, "body must not be null");
+    public static <T> Iterable<T> asyncGenerator(Consumer<Object> body) {
+        Objects.requireNonNull(body, "body must not be null");
         GeneratorBridge<T> bridge = new GeneratorBridge<>();
         defaultExecutor.execute(() -> {
             try {
@@ -384,19 +388,25 @@ public class AsyncSupport {
         }
         if (source instanceof Object[]) return (Iterable<T>) 
Arrays.asList((Object[]) source);
         // Try adapter registry
-        return groovy.concurrent.AwaitableAdapterRegistry.toIterable(source);
+        return AwaitableAdapterRegistry.toIterable(source);
     }
 
     /**
-     * Closes a source if it implements {@link java.io.Closeable} or
+     * Closes a source if it implements {@link Closeable} or
      * {@link AutoCloseable}. Called by compiler-generated finally block
      * in {@code for await} loops.
      */
     public static void closeIterable(Object source) {
-        if (source instanceof java.io.Closeable c) {
-            try { c.close(); } catch (Exception ignored) { }
+        if (source instanceof Closeable c) {
+            try {
+                c.close();
+            } catch (Exception ignored) {
+            }
         } else if (source instanceof AutoCloseable c) {
-            try { c.close(); } catch (Exception ignored) { }
+            try {
+                c.close();
+            } catch (Exception ignored) {
+            }
         }
     }
 
@@ -441,8 +451,8 @@ public class AsyncSupport {
                 .toArray(CompletableFuture[]::new);
 
         CompletableFuture<T> result = new CompletableFuture<>();
-        var remaining = new 
java.util.concurrent.atomic.AtomicInteger(futures.length);
-        List<Throwable> errors = java.util.Collections.synchronizedList(new 
ArrayList<>());
+        var remaining = new AtomicInteger(futures.length);
+        List<Throwable> errors = Collections.synchronizedList(new 
ArrayList<>());
         for (CompletableFuture<T> f : futures) {
             f.whenComplete((value, error) -> {
                 if (error == null) {
@@ -507,7 +517,7 @@ public class AsyncSupport {
         // source fails OR all sources succeed.  We track the first
         // failure explicitly because allOf doesn't guarantee which
         // exception propagates when multiple futures fail.
-        var firstError = new 
java.util.concurrent.atomic.AtomicReference<Throwable>();
+        var firstError = new AtomicReference<Throwable>();
         for (CompletableFuture<?> f : futures) {
             f.whenComplete((v, e) -> {
                 if (e != null) firstError.compareAndSet(null, e);
@@ -548,8 +558,8 @@ public class AsyncSupport {
                 .map(s -> (CompletableFuture<T>) 
Awaitable.from(s).toCompletableFuture())
                 .toArray(CompletableFuture[]::new);
         CompletableFuture<T> result = new CompletableFuture<>();
-        var remaining = new 
java.util.concurrent.atomic.AtomicInteger(futures.length);
-        List<Throwable> errors = java.util.Collections.synchronizedList(new 
ArrayList<>());
+        var remaining = new AtomicInteger(futures.length);
+        List<Throwable> errors = Collections.synchronizedList(new 
ArrayList<>());
         for (CompletableFuture<T> f : futures) {
             f.whenComplete((value, error) -> {
                 if (error == null) {
@@ -680,8 +690,8 @@ public class AsyncSupport {
 
     public static Throwable unwrap(Throwable t) {
         while ((t instanceof CompletionException || t instanceof 
ExecutionException
-                || t instanceof java.lang.reflect.InvocationTargetException
-                || t instanceof java.lang.reflect.UndeclaredThrowableException)
+                || t instanceof InvocationTargetException
+                || t instanceof UndeclaredThrowableException)
                 && t.getCause() != null) {
             t = t.getCause();
         }
@@ -695,6 +705,17 @@ public class AsyncSupport {
 
     // ---- internal utilities ---------------------------------------------
 
+    private static Executor createDefaultExecutor() {
+        return VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : 
FALLBACK_EXECUTOR;
+    }
+
+    private static CancellationException interruptedAwait(String message, 
InterruptedException cause) {
+        Thread.currentThread().interrupt();
+        CancellationException cancellation = new 
CancellationException(message);
+        cancellation.initCause(cause);
+        return cancellation;
+    }
+
     private static int getIntegerSafe(String name, int defaultValue) {
         try {
             return Integer.getInteger(name, defaultValue);
diff --git a/src/test/groovy/groovy/AsyncAwaitTest.groovy 
b/src/test/groovy/groovy/AsyncAwaitTest.groovy
index 2db512b4b0..90b6ef4936 100644
--- a/src/test/groovy/groovy/AsyncAwaitTest.groovy
+++ b/src/test/groovy/groovy/AsyncAwaitTest.groovy
@@ -3271,6 +3271,29 @@ final class AsyncAwaitTest {
         '''
     }
 
+    @Test
+    void testWrapAsyncUsesConfiguredExecutor() {
+        assertScript '''
+            import org.apache.groovy.runtime.async.AsyncClosureUtils
+            import org.apache.groovy.runtime.async.AsyncSupport
+            import java.util.concurrent.Executors
+
+            def exec = Executors.newSingleThreadExecutor { r ->
+                def t = new Thread(r, 'wrap-async-pool')
+                t.daemon = true
+                t
+            }
+            try {
+                def wrapped = AsyncClosureUtils.wrapAsync { 
Thread.currentThread().name }
+                AsyncSupport.setExecutor(exec)
+                assert await(wrapped()) == 'wrap-async-pool'
+            } finally {
+                AsyncSupport.resetExecutor()
+                exec.shutdown()
+            }
+        '''
+    }
+
     @Test
     void testWrapAsyncWithException() {
         assertScript '''
@@ -3307,6 +3330,19 @@ final class AsyncAwaitTest {
         '''
     }
 
+    @Test
+    void testWrapAsyncGeneratorWithArgs() {
+        assertScript '''
+            import org.apache.groovy.runtime.async.AsyncClosureUtils
+            import org.apache.groovy.runtime.async.GeneratorBridge
+
+            def wrapped = AsyncClosureUtils.wrapAsyncGenerator({ 
GeneratorBridge bridge, String prefix, List values ->
+                values.each { bridge.yield(prefix + it) }
+            })
+            assert wrapped('item-', [1, 2, 3]).toList() == ['item-1', 
'item-2', 'item-3']
+        '''
+    }
+
     @Test
     void testWrapAsyncGeneratorWithBreak() {
         assertScript '''

Reply via email to