This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/master by this push:
new 951058c246 Minor refactor: extract duplicated code
951058c246 is described below
commit 951058c246b39db6ba50ca12360b5b079e5bc7ef
Author: Daniel Sun <[email protected]>
AuthorDate: Sat May 30 17:39:36 2026 +0900
Minor refactor: extract duplicated code
---
.../groovy/runtime/async/AsyncClosureUtils.java | 31 +++-----
.../apache/groovy/runtime/async/AsyncSupport.java | 91 +++++++++++++---------
src/test/groovy/groovy/AsyncAwaitTest.groovy | 36 +++++++++
3 files changed, 101 insertions(+), 57 deletions(-)
diff --git
a/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
index 1d6478a690..62e0b29f50 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java
@@ -22,7 +22,6 @@ import groovy.concurrent.Awaitable;
import groovy.lang.Closure;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
/**
* Closure-specific async utilities that have no pure-Java equivalent.
@@ -58,13 +57,7 @@ public final class AsyncClosureUtils {
return new Closure<Awaitable<T>>(closure.getOwner(),
closure.getThisObject()) {
@SuppressWarnings("unused")
public Awaitable<T> doCall(Object... args) {
- return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
- try {
- return closure.call(args);
- } catch (Throwable t) {
- throw AsyncSupport.wrapForFuture(t);
- }
- }, AsyncSupport.getExecutor()));
+ return AsyncSupport.async(() -> closure.call(args));
}
};
}
@@ -84,21 +77,15 @@ public final class AsyncClosureUtils {
return new Closure<Iterable<T>>(closure.getOwner(),
closure.getThisObject()) {
@SuppressWarnings("unused")
public Iterable<T> doCall(Object... args) {
- GeneratorBridge<T> bridge = new GeneratorBridge<>();
- Object[] allArgs = new Object[args.length + 1];
- allArgs[0] = bridge;
- System.arraycopy(args, 0, allArgs, 1, args.length);
- AsyncSupport.getExecutor().execute(() -> {
- try {
- closure.call(allArgs);
- bridge.complete();
- } catch (GeneratorBridge.GeneratorClosedException ignored)
{
- } catch (Throwable t) {
- bridge.completeExceptionally(t);
- }
- });
- return () -> bridge;
+ return AsyncSupport.asyncGenerator(bridge ->
closure.call(prependArgument(bridge, args)));
}
};
}
+
+ private static Object[] prependArgument(Object argument, Object[] args) {
+ Object[] allArgs = new Object[args.length + 1];
+ allArgs[0] = argument;
+ System.arraycopy(args, 0, allArgs, 1, args.length);
+ return allArgs;
+ }
}
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 01878212d8..76c94abb2f 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -20,16 +20,23 @@ package org.apache.groovy.runtime.async;
import groovy.concurrent.AwaitResult;
import groovy.concurrent.Awaitable;
+import groovy.concurrent.AwaitableAdapterRegistry;
+import java.io.Closeable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -45,6 +52,10 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* Internal runtime support for the {@code async}/{@code await}/{@code defer}
language features.
@@ -111,8 +122,7 @@ public class AsyncSupport {
}
}
- private static volatile Executor defaultExecutor =
- VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR :
FALLBACK_EXECUTOR;
+ private static volatile Executor defaultExecutor = createDefaultExecutor();
private static final ScheduledExecutorService SCHEDULER =
Executors.newSingleThreadScheduledExecutor(r -> {
@@ -147,7 +157,7 @@ public class AsyncSupport {
/** Resets the executor to the default (virtual threads on JDK 21+, cached
pool otherwise). */
public static void resetExecutor() {
- defaultExecutor = VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR
: FALLBACK_EXECUTOR;
+ defaultExecutor = createDefaultExecutor();
}
// ---- await overloads ------------------------------------------------
@@ -163,10 +173,7 @@ public class AsyncSupport {
} catch (ExecutionException e) {
throw rethrowUnwrapped(e);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- CancellationException ce = new CancellationException("Interrupted
while awaiting");
- ce.initCause(e);
- throw ce;
+ throw interruptedAwait("Interrupted while awaiting", e);
}
}
@@ -198,10 +205,7 @@ public class AsyncSupport {
} catch (CancellationException e) {
throw e;
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- CancellationException ce = new CancellationException("Interrupted
while awaiting future");
- ce.initCause(e);
- throw ce;
+ throw interruptedAwait("Interrupted while awaiting future", e);
}
}
@@ -225,8 +229,8 @@ public class AsyncSupport {
* Executes the given supplier asynchronously on the specified executor,
* returning an {@link Awaitable}.
*/
- public static <T> Awaitable<T> executeAsync(java.util.function.Supplier<T>
supplier, Executor executor) {
- java.util.Objects.requireNonNull(supplier, "supplier must not be
null");
+ public static <T> Awaitable<T> executeAsync(Supplier<T> supplier, Executor
executor) {
+ Objects.requireNonNull(supplier, "supplier must not be null");
Executor targetExecutor = executor != null ? executor :
defaultExecutor;
return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
try {
@@ -240,15 +244,15 @@ public class AsyncSupport {
/**
* Executes the given supplier asynchronously using the default executor.
*/
- public static <T> Awaitable<T> async(java.util.function.Supplier<T>
supplier) {
+ public static <T> Awaitable<T> async(Supplier<T> supplier) {
return executeAsync(supplier, defaultExecutor);
}
/**
* Lightweight task spawn. Executes the supplier asynchronously using the
default executor.
*/
- public static <T> Awaitable<T> go(java.util.function.Supplier<T> supplier)
{
- return executeAsync(supplier, defaultExecutor);
+ public static <T> Awaitable<T> go(Supplier<T> supplier) {
+ return async(supplier);
}
// ---- defer ----------------------------------------------------------
@@ -258,16 +262,16 @@ public class AsyncSupport {
* Called by compiler-generated code at the start of closures
* containing {@code defer} statements.
*/
- public static java.util.Deque<java.util.concurrent.Callable<?>>
createDeferScope() {
- return new java.util.ArrayDeque<>();
+ public static Deque<Callable<?>> createDeferScope() {
+ return new ArrayDeque<>();
}
/**
* Registers a deferred action in the given scope. Actions execute in LIFO
* order when {@link #executeDeferScope} is called (in the finally block).
*/
- public static void defer(java.util.Deque<java.util.concurrent.Callable<?>>
scope,
- java.util.concurrent.Callable<?> action) {
+ public static void defer(Deque<Callable<?>> scope,
+ Callable<?> action) {
if (scope == null) {
throw new IllegalStateException("defer must be used inside an
async closure");
}
@@ -283,7 +287,7 @@ public class AsyncSupport {
* a Future/Awaitable, the result is awaited before continuing.
*/
@SuppressWarnings("unchecked")
- public static void
executeDeferScope(java.util.Deque<java.util.concurrent.Callable<?>> scope) {
+ public static void executeDeferScope(Deque<Callable<?>> scope) {
if (scope == null || scope.isEmpty()) return;
Throwable firstError = null;
while (!scope.isEmpty()) {
@@ -346,8 +350,8 @@ public class AsyncSupport {
* @return an Iterable that yields values from the generator
*/
@SuppressWarnings("unchecked")
- public static <T> Iterable<T>
asyncGenerator(java.util.function.Consumer<Object> body) {
- java.util.Objects.requireNonNull(body, "body must not be null");
+ public static <T> Iterable<T> asyncGenerator(Consumer<Object> body) {
+ Objects.requireNonNull(body, "body must not be null");
GeneratorBridge<T> bridge = new GeneratorBridge<>();
defaultExecutor.execute(() -> {
try {
@@ -384,19 +388,25 @@ public class AsyncSupport {
}
if (source instanceof Object[]) return (Iterable<T>)
Arrays.asList((Object[]) source);
// Try adapter registry
- return groovy.concurrent.AwaitableAdapterRegistry.toIterable(source);
+ return AwaitableAdapterRegistry.toIterable(source);
}
/**
- * Closes a source if it implements {@link java.io.Closeable} or
+ * Closes a source if it implements {@link Closeable} or
* {@link AutoCloseable}. Called by compiler-generated finally block
* in {@code for await} loops.
*/
public static void closeIterable(Object source) {
- if (source instanceof java.io.Closeable c) {
- try { c.close(); } catch (Exception ignored) { }
+ if (source instanceof Closeable c) {
+ try {
+ c.close();
+ } catch (Exception ignored) {
+ }
} else if (source instanceof AutoCloseable c) {
- try { c.close(); } catch (Exception ignored) { }
+ try {
+ c.close();
+ } catch (Exception ignored) {
+ }
}
}
@@ -441,8 +451,8 @@ public class AsyncSupport {
.toArray(CompletableFuture[]::new);
CompletableFuture<T> result = new CompletableFuture<>();
- var remaining = new
java.util.concurrent.atomic.AtomicInteger(futures.length);
- List<Throwable> errors = java.util.Collections.synchronizedList(new
ArrayList<>());
+ var remaining = new AtomicInteger(futures.length);
+ List<Throwable> errors = Collections.synchronizedList(new
ArrayList<>());
for (CompletableFuture<T> f : futures) {
f.whenComplete((value, error) -> {
if (error == null) {
@@ -507,7 +517,7 @@ public class AsyncSupport {
// source fails OR all sources succeed. We track the first
// failure explicitly because allOf doesn't guarantee which
// exception propagates when multiple futures fail.
- var firstError = new
java.util.concurrent.atomic.AtomicReference<Throwable>();
+ var firstError = new AtomicReference<Throwable>();
for (CompletableFuture<?> f : futures) {
f.whenComplete((v, e) -> {
if (e != null) firstError.compareAndSet(null, e);
@@ -548,8 +558,8 @@ public class AsyncSupport {
.map(s -> (CompletableFuture<T>)
Awaitable.from(s).toCompletableFuture())
.toArray(CompletableFuture[]::new);
CompletableFuture<T> result = new CompletableFuture<>();
- var remaining = new
java.util.concurrent.atomic.AtomicInteger(futures.length);
- List<Throwable> errors = java.util.Collections.synchronizedList(new
ArrayList<>());
+ var remaining = new AtomicInteger(futures.length);
+ List<Throwable> errors = Collections.synchronizedList(new
ArrayList<>());
for (CompletableFuture<T> f : futures) {
f.whenComplete((value, error) -> {
if (error == null) {
@@ -680,8 +690,8 @@ public class AsyncSupport {
public static Throwable unwrap(Throwable t) {
while ((t instanceof CompletionException || t instanceof
ExecutionException
- || t instanceof java.lang.reflect.InvocationTargetException
- || t instanceof java.lang.reflect.UndeclaredThrowableException)
+ || t instanceof InvocationTargetException
+ || t instanceof UndeclaredThrowableException)
&& t.getCause() != null) {
t = t.getCause();
}
@@ -695,6 +705,17 @@ public class AsyncSupport {
// ---- internal utilities ---------------------------------------------
+ private static Executor createDefaultExecutor() {
+ return VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR :
FALLBACK_EXECUTOR;
+ }
+
+ private static CancellationException interruptedAwait(String message,
InterruptedException cause) {
+ Thread.currentThread().interrupt();
+ CancellationException cancellation = new
CancellationException(message);
+ cancellation.initCause(cause);
+ return cancellation;
+ }
+
private static int getIntegerSafe(String name, int defaultValue) {
try {
return Integer.getInteger(name, defaultValue);
diff --git a/src/test/groovy/groovy/AsyncAwaitTest.groovy
b/src/test/groovy/groovy/AsyncAwaitTest.groovy
index 2db512b4b0..90b6ef4936 100644
--- a/src/test/groovy/groovy/AsyncAwaitTest.groovy
+++ b/src/test/groovy/groovy/AsyncAwaitTest.groovy
@@ -3271,6 +3271,29 @@ final class AsyncAwaitTest {
'''
}
+ @Test
+ void testWrapAsyncUsesConfiguredExecutor() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncClosureUtils
+ import org.apache.groovy.runtime.async.AsyncSupport
+ import java.util.concurrent.Executors
+
+ def exec = Executors.newSingleThreadExecutor { r ->
+ def t = new Thread(r, 'wrap-async-pool')
+ t.daemon = true
+ t
+ }
+ try {
+ def wrapped = AsyncClosureUtils.wrapAsync {
Thread.currentThread().name }
+ AsyncSupport.setExecutor(exec)
+ assert await(wrapped()) == 'wrap-async-pool'
+ } finally {
+ AsyncSupport.resetExecutor()
+ exec.shutdown()
+ }
+ '''
+ }
+
@Test
void testWrapAsyncWithException() {
assertScript '''
@@ -3307,6 +3330,19 @@ final class AsyncAwaitTest {
'''
}
+ @Test
+ void testWrapAsyncGeneratorWithArgs() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncClosureUtils
+ import org.apache.groovy.runtime.async.GeneratorBridge
+
+ def wrapped = AsyncClosureUtils.wrapAsyncGenerator({
GeneratorBridge bridge, String prefix, List values ->
+ values.each { bridge.yield(prefix + it) }
+ })
+ assert wrapped('item-', [1, 2, 3]).toList() == ['item-1',
'item-2', 'item-3']
+ '''
+ }
+
@Test
void testWrapAsyncGeneratorWithBreak() {
assertScript '''