This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/master by this push:
new 9a2496973b Minor refactor: extract duplicated code((cont'd))
9a2496973b is described below
commit 9a2496973b71c7d86d44f0f0571e25fb1eea2178
Author: Daniel Sun <[email protected]>
AuthorDate: Sat May 30 18:45:41 2026 +0900
Minor refactor: extract duplicated code((cont'd))
---
.../apache/groovy/runtime/async/AsyncSupport.java | 194 +++++++++------------
src/test/groovy/groovy/AsyncAwaitTest.groovy | 78 +++++++++
2 files changed, 160 insertions(+), 112 deletions(-)
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 76c94abb2f..588f5a3084 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -416,102 +416,40 @@ public class AsyncSupport {
* Waits for all given sources to complete, returning their results in
order.
* Multi-arg {@code await(a, b, c)} desugars to this.
*/
- @SuppressWarnings("unchecked")
public static <T> List<T> all(Object... sources) {
- CompletableFuture<?>[] futures = Arrays.stream(sources)
- .map(s -> Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCombinatorFutures(sources);
CompletableFuture.allOf(futures).join();
- List<T> results = new ArrayList<>(futures.length);
- for (CompletableFuture<?> f : futures) {
- results.add((T) f.join());
- }
- return results;
+ return getJoinedResults(futures);
}
/**
* Returns the result of the first source to complete (success or failure).
*/
- @SuppressWarnings("unchecked")
public static <T> T any(Object... sources) {
- CompletableFuture<?>[] futures = Arrays.stream(sources)
- .map(s -> Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
- return (T) CompletableFuture.anyOf(futures).join();
+ return
AsyncSupport.await(AsyncSupport.<T>anyAsync(sources).toCompletableFuture());
}
/**
* Returns the result of the first source to complete
<em>successfully</em>.
* Only fails when all sources fail.
*/
- @SuppressWarnings("unchecked")
public static <T> T first(Object... sources) {
- CompletableFuture<T>[] futures = Arrays.stream(sources)
- .map(s -> (CompletableFuture<T>)
Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
-
- CompletableFuture<T> result = new CompletableFuture<>();
- var remaining = new AtomicInteger(futures.length);
- List<Throwable> errors = Collections.synchronizedList(new
ArrayList<>());
- for (CompletableFuture<T> f : futures) {
- f.whenComplete((value, error) -> {
- if (error == null) {
- result.complete(value);
- } else {
- errors.add(error);
- if (remaining.decrementAndGet() == 0) {
- CompletionException aggregate = new
CompletionException(
- "All " + futures.length + " tasks failed",
errors.get(0));
- for (int i = 1; i < errors.size(); i++) {
- aggregate.addSuppressed(errors.get(i));
- }
- result.completeExceptionally(aggregate);
- }
- }
- });
- }
- try {
- return result.join();
- } catch (CompletionException e) {
- throw rethrowUnwrapped(e);
- }
+ return
AsyncSupport.await(AsyncSupport.<T>firstAsync(sources).toCompletableFuture());
}
/**
* Waits for all sources to settle (succeed or fail), returning a list of
* {@link AwaitResult} without throwing.
*/
- @SuppressWarnings("unchecked")
public static List<AwaitResult<Object>> allSettled(Object... sources) {
- CompletableFuture<?>[] futures = Arrays.stream(sources)
- .map(s -> Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
- CompletableFuture.allOf(
- Arrays.stream(futures)
- .map(f -> f.handle((v, e) -> null))
- .toArray(CompletableFuture[]::new)
- ).join();
- List<AwaitResult<Object>> results = new ArrayList<>(futures.length);
- for (CompletableFuture<?> f : futures) {
- try {
- results.add(AwaitResult.success(f.join()));
- } catch (CompletionException e) {
- results.add(AwaitResult.failure(unwrap(e)));
- } catch (CancellationException e) {
- results.add(AwaitResult.failure(e));
- }
- }
- return results;
+ return
AsyncSupport.await(allSettledAsync(sources).toCompletableFuture());
}
// ---- async combinator variants (return Awaitable, non-blocking) ------
/** Non-blocking variant of {@link #all} — returns an Awaitable. */
- @SuppressWarnings("unchecked")
public static Awaitable<List<Object>> allAsync(Object... sources) {
- CompletableFuture<?>[] futures = Arrays.stream(sources)
- .map(s -> Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCombinatorFutures(sources);
// allOf is naturally fail-fast: it completes as soon as any
// source fails OR all sources succeed. We track the first
@@ -525,11 +463,7 @@ public class AsyncSupport {
}
CompletableFuture<List<Object>> combined =
CompletableFuture.allOf(futures)
- .thenApply(v -> {
- List<Object> results = new ArrayList<>(futures.length);
- for (CompletableFuture<?> f : futures)
results.add(f.join());
- return results;
- });
+ .thenApply(v -> getJoinedResults(futures));
// Replace allOf's arbitrary exception with the temporally-first one
CompletableFuture<List<Object>> withFirstError =
combined.exceptionally(e -> {
@@ -545,35 +479,25 @@ public class AsyncSupport {
/** Non-blocking variant of {@link #any} — returns an Awaitable. */
@SuppressWarnings("unchecked")
public static <T> Awaitable<T> anyAsync(Object... sources) {
- CompletableFuture<?>[] futures = Arrays.stream(sources)
- .map(s -> Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
- return (Awaitable<T>)
GroovyPromise.of(CompletableFuture.anyOf(futures));
+ return (Awaitable<T>)
GroovyPromise.of(CompletableFuture.anyOf(toCombinatorFutures(sources)));
}
/** Non-blocking variant of {@link #first} — returns an Awaitable. */
@SuppressWarnings("unchecked")
public static <T> Awaitable<T> firstAsync(Object... sources) {
- CompletableFuture<T>[] futures = Arrays.stream(sources)
- .map(s -> (CompletableFuture<T>)
Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
+ CompletableFuture<T>[] futures = toFirstCombinatorFutures(sources);
CompletableFuture<T> result = new CompletableFuture<>();
- var remaining = new AtomicInteger(futures.length);
- List<Throwable> errors = Collections.synchronizedList(new
ArrayList<>());
- for (CompletableFuture<T> f : futures) {
- f.whenComplete((value, error) -> {
+ var remainingFailures = new AtomicInteger(futures.length);
+ List<Throwable> errors = Collections.synchronizedList(new
ArrayList<>(futures.length));
+ for (CompletableFuture<T> future : futures) {
+ future.whenComplete((value, error) -> {
if (error == null) {
result.complete(value);
- } else {
- errors.add(error);
- if (remaining.decrementAndGet() == 0) {
- CompletionException aggregate = new
CompletionException(
- "All " + futures.length + " tasks failed",
errors.get(0));
- for (int i = 1; i < errors.size(); i++) {
- aggregate.addSuppressed(errors.get(i));
- }
- result.completeExceptionally(aggregate);
- }
+ return;
+ }
+ errors.add(error);
+ if (remainingFailures.decrementAndGet() == 0) {
+
result.completeExceptionally(aggregateFirstFailures(futures.length, errors));
}
});
}
@@ -581,29 +505,75 @@ public class AsyncSupport {
}
/** Non-blocking variant of {@link #allSettled} — returns an Awaitable. */
- @SuppressWarnings("unchecked")
public static Awaitable<List<AwaitResult<Object>>>
allSettledAsync(Object... sources) {
- CompletableFuture<?>[] futures = Arrays.stream(sources)
- .map(s -> Awaitable.from(s).toCompletableFuture())
- .toArray(CompletableFuture[]::new);
+ CompletableFuture<?>[] futures = toCombinatorFutures(sources);
CompletableFuture<List<AwaitResult<Object>>> combined =
CompletableFuture.allOf(
- Arrays.stream(futures).map(f -> f.handle((v, e) ->
null)).toArray(CompletableFuture[]::new)
- ).thenApply(v -> {
- List<AwaitResult<Object>> results = new
ArrayList<>(futures.length);
- for (CompletableFuture<?> f : futures) {
- try {
- results.add(AwaitResult.success(f.join()));
- } catch (CompletionException e) {
- results.add(AwaitResult.failure(unwrap(e)));
- } catch (CancellationException e) {
- results.add(AwaitResult.failure(e));
- }
- }
- return results;
- });
+ Arrays.stream(futures)
+ .map(future -> future.handle((value, error) -> null))
+ .toArray(CompletableFuture[]::new)
+ )
+ .thenApply(v -> getAwaitResults(futures));
return GroovyPromise.of(combined);
}
+ @SuppressWarnings("unchecked")
+ private static <T> List<T> getJoinedResults(CompletableFuture<?>[]
futures) {
+ List<T> results = new ArrayList<>(futures.length);
+ for (CompletableFuture<?> future : futures) {
+ results.add((T) future.join());
+ }
+ return results;
+ }
+
+ private static List<AwaitResult<Object>>
getAwaitResults(CompletableFuture<?>[] futures) {
+ List<AwaitResult<Object>> results = new ArrayList<>(futures.length);
+ for (CompletableFuture<?> f : futures) {
+ try {
+ results.add(AwaitResult.success(f.join()));
+ } catch (CompletionException e) {
+ results.add(AwaitResult.failure(unwrap(e)));
+ } catch (CancellationException e) {
+ results.add(AwaitResult.failure(e));
+ }
+ }
+ return results;
+ }
+
+ private static CompletableFuture<?>[] toCombinatorFutures(Object...
sources) {
+ return Arrays.stream(sources)
+ .map(source -> Awaitable.from(source).toCompletableFuture())
+ .toArray(CompletableFuture[]::new);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> CompletableFuture<T>[]
toFirstCombinatorFutures(Object... sources) {
+ validateFirstSources(sources);
+ return (CompletableFuture<T>[]) toCombinatorFutures(sources);
+ }
+
+ private static void validateFirstSources(Object[] sources) {
+ if (sources == null) {
+ throw new IllegalArgumentException("sources must not be null");
+ }
+ if (sources.length == 0) {
+ throw new IllegalArgumentException("sources must not be empty");
+ }
+ for (Object source : sources) {
+ if (source == null) {
+ throw new IllegalArgumentException("sources must not contain
null elements");
+ }
+ }
+ }
+
+ private static CompletionException aggregateFirstFailures(int sourceCount,
List<Throwable> errors) {
+ CompletionException aggregate = new CompletionException(
+ "All " + sourceCount + " tasks failed", errors.get(0));
+ for (int i = 1; i < errors.size(); i++) {
+ aggregate.addSuppressed(errors.get(i));
+ }
+ return aggregate;
+ }
+
// ---- delay and timeout ----------------------------------------------
/**
diff --git a/src/test/groovy/groovy/AsyncAwaitTest.groovy
b/src/test/groovy/groovy/AsyncAwaitTest.groovy
index 90b6ef4936..44e884efab 100644
--- a/src/test/groovy/groovy/AsyncAwaitTest.groovy
+++ b/src/test/groovy/groovy/AsyncAwaitTest.groovy
@@ -2142,6 +2142,58 @@ final class AsyncAwaitTest {
'''
}
+ @Test
+ void testAsyncSupportFirstReturnsFirstSuccess() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+
+ def fail = async { throw new RuntimeException('fail') }
+ def success = async { 'ok' }
+
+ assert AsyncSupport.first(fail, success) == 'ok'
+ '''
+ }
+
+ @Test
+ void testAsyncSupportAllReturnsOrderedResults() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+
+ def slow = async { Thread.sleep(100); 'slow' }
+ def fast = async { 'fast' }
+
+ assert AsyncSupport.all(slow, fast) == ['slow', 'fast']
+ '''
+ }
+
+ @Test
+ void testAsyncSupportAnyReturnsFirstCompletion() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+
+ def slow = async { Thread.sleep(100); 'slow' }
+ def fast = async { 'fast' }
+
+ assert AsyncSupport.any(slow, fast) == 'fast'
+ '''
+ }
+
+ @Test
+ void testAsyncSupportAllSettledReturnsMixedOutcomes() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+
+ def results = AsyncSupport.allSettled(
+ async { 42 },
+ async { throw new RuntimeException('boom') }
+ )
+
+ assert results*.success == [true, false]
+ assert results[0].value == 42
+ assert results[1].error.message == 'boom'
+ '''
+ }
+
@Test
void testAwaitableFromAwaitableReturnsSame() {
assertScript '''
@@ -2326,6 +2378,32 @@ final class AsyncAwaitTest {
'''
}
+ @Test
+ void testAsyncSupportFirstRejectsEmptySources() {
+ def message = shouldFail(IllegalArgumentException) {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+
+ AsyncSupport.first()
+ '''
+ }
+
+ assert message.message == 'sources must not be empty'
+ }
+
+ @Test
+ void testAsyncSupportFirstAsyncRejectsNullSource() {
+ def message = shouldFail(IllegalArgumentException) {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+
+ AsyncSupport.firstAsync(async { 'ok' }, null)
+ '''
+ }
+
+ assert message.message == 'sources must not contain null elements'
+ }
+
@Test
void testForAwaitWithEmptyList() {
assertScript '''