This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a2496973b Minor refactor: extract duplicated code((cont'd))
9a2496973b is described below

commit 9a2496973b71c7d86d44f0f0571e25fb1eea2178
Author: Daniel Sun <[email protected]>
AuthorDate: Sat May 30 18:45:41 2026 +0900

    Minor refactor: extract duplicated code((cont'd))
---
 .../apache/groovy/runtime/async/AsyncSupport.java  | 194 +++++++++------------
 src/test/groovy/groovy/AsyncAwaitTest.groovy       |  78 +++++++++
 2 files changed, 160 insertions(+), 112 deletions(-)

diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 76c94abb2f..588f5a3084 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -416,102 +416,40 @@ public class AsyncSupport {
      * Waits for all given sources to complete, returning their results in 
order.
      * Multi-arg {@code await(a, b, c)} desugars to this.
      */
-    @SuppressWarnings("unchecked")
     public static <T> List<T> all(Object... sources) {
-        CompletableFuture<?>[] futures = Arrays.stream(sources)
-                .map(s -> Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCombinatorFutures(sources);
         CompletableFuture.allOf(futures).join();
-        List<T> results = new ArrayList<>(futures.length);
-        for (CompletableFuture<?> f : futures) {
-            results.add((T) f.join());
-        }
-        return results;
+        return getJoinedResults(futures);
     }
 
     /**
      * Returns the result of the first source to complete (success or failure).
      */
-    @SuppressWarnings("unchecked")
     public static <T> T any(Object... sources) {
-        CompletableFuture<?>[] futures = Arrays.stream(sources)
-                .map(s -> Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
-        return (T) CompletableFuture.anyOf(futures).join();
+        return 
AsyncSupport.await(AsyncSupport.<T>anyAsync(sources).toCompletableFuture());
     }
 
     /**
      * Returns the result of the first source to complete 
<em>successfully</em>.
      * Only fails when all sources fail.
      */
-    @SuppressWarnings("unchecked")
     public static <T> T first(Object... sources) {
-        CompletableFuture<T>[] futures = Arrays.stream(sources)
-                .map(s -> (CompletableFuture<T>) 
Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
-
-        CompletableFuture<T> result = new CompletableFuture<>();
-        var remaining = new AtomicInteger(futures.length);
-        List<Throwable> errors = Collections.synchronizedList(new 
ArrayList<>());
-        for (CompletableFuture<T> f : futures) {
-            f.whenComplete((value, error) -> {
-                if (error == null) {
-                    result.complete(value);
-                } else {
-                    errors.add(error);
-                    if (remaining.decrementAndGet() == 0) {
-                        CompletionException aggregate = new 
CompletionException(
-                                "All " + futures.length + " tasks failed", 
errors.get(0));
-                        for (int i = 1; i < errors.size(); i++) {
-                            aggregate.addSuppressed(errors.get(i));
-                        }
-                        result.completeExceptionally(aggregate);
-                    }
-                }
-            });
-        }
-        try {
-            return result.join();
-        } catch (CompletionException e) {
-            throw rethrowUnwrapped(e);
-        }
+        return 
AsyncSupport.await(AsyncSupport.<T>firstAsync(sources).toCompletableFuture());
     }
 
     /**
      * Waits for all sources to settle (succeed or fail), returning a list of
      * {@link AwaitResult} without throwing.
      */
-    @SuppressWarnings("unchecked")
     public static List<AwaitResult<Object>> allSettled(Object... sources) {
-        CompletableFuture<?>[] futures = Arrays.stream(sources)
-                .map(s -> Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
-        CompletableFuture.allOf(
-                Arrays.stream(futures)
-                        .map(f -> f.handle((v, e) -> null))
-                        .toArray(CompletableFuture[]::new)
-        ).join();
-        List<AwaitResult<Object>> results = new ArrayList<>(futures.length);
-        for (CompletableFuture<?> f : futures) {
-            try {
-                results.add(AwaitResult.success(f.join()));
-            } catch (CompletionException e) {
-                results.add(AwaitResult.failure(unwrap(e)));
-            } catch (CancellationException e) {
-                results.add(AwaitResult.failure(e));
-            }
-        }
-        return results;
+        return 
AsyncSupport.await(allSettledAsync(sources).toCompletableFuture());
     }
 
     // ---- async combinator variants (return Awaitable, non-blocking) ------
 
     /** Non-blocking variant of {@link #all} — returns an Awaitable. */
-    @SuppressWarnings("unchecked")
     public static Awaitable<List<Object>> allAsync(Object... sources) {
-        CompletableFuture<?>[] futures = Arrays.stream(sources)
-                .map(s -> Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCombinatorFutures(sources);
 
         // allOf is naturally fail-fast: it completes as soon as any
         // source fails OR all sources succeed.  We track the first
@@ -525,11 +463,7 @@ public class AsyncSupport {
         }
 
         CompletableFuture<List<Object>> combined = 
CompletableFuture.allOf(futures)
-                .thenApply(v -> {
-                    List<Object> results = new ArrayList<>(futures.length);
-                    for (CompletableFuture<?> f : futures) 
results.add(f.join());
-                    return results;
-                });
+                .thenApply(v -> getJoinedResults(futures));
 
         // Replace allOf's arbitrary exception with the temporally-first one
         CompletableFuture<List<Object>> withFirstError = 
combined.exceptionally(e -> {
@@ -545,35 +479,25 @@ public class AsyncSupport {
     /** Non-blocking variant of {@link #any} — returns an Awaitable. */
     @SuppressWarnings("unchecked")
     public static <T> Awaitable<T> anyAsync(Object... sources) {
-        CompletableFuture<?>[] futures = Arrays.stream(sources)
-                .map(s -> Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
-        return (Awaitable<T>) 
GroovyPromise.of(CompletableFuture.anyOf(futures));
+        return (Awaitable<T>) 
GroovyPromise.of(CompletableFuture.anyOf(toCombinatorFutures(sources)));
     }
 
     /** Non-blocking variant of {@link #first} — returns an Awaitable. */
     @SuppressWarnings("unchecked")
     public static <T> Awaitable<T> firstAsync(Object... sources) {
-        CompletableFuture<T>[] futures = Arrays.stream(sources)
-                .map(s -> (CompletableFuture<T>) 
Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
+        CompletableFuture<T>[] futures = toFirstCombinatorFutures(sources);
         CompletableFuture<T> result = new CompletableFuture<>();
-        var remaining = new AtomicInteger(futures.length);
-        List<Throwable> errors = Collections.synchronizedList(new 
ArrayList<>());
-        for (CompletableFuture<T> f : futures) {
-            f.whenComplete((value, error) -> {
+        var remainingFailures = new AtomicInteger(futures.length);
+        List<Throwable> errors = Collections.synchronizedList(new 
ArrayList<>(futures.length));
+        for (CompletableFuture<T> future : futures) {
+            future.whenComplete((value, error) -> {
                 if (error == null) {
                     result.complete(value);
-                } else {
-                    errors.add(error);
-                    if (remaining.decrementAndGet() == 0) {
-                        CompletionException aggregate = new 
CompletionException(
-                                "All " + futures.length + " tasks failed", 
errors.get(0));
-                        for (int i = 1; i < errors.size(); i++) {
-                            aggregate.addSuppressed(errors.get(i));
-                        }
-                        result.completeExceptionally(aggregate);
-                    }
+                    return;
+                }
+                errors.add(error);
+                if (remainingFailures.decrementAndGet() == 0) {
+                    
result.completeExceptionally(aggregateFirstFailures(futures.length, errors));
                 }
             });
         }
@@ -581,29 +505,75 @@ public class AsyncSupport {
     }
 
     /** Non-blocking variant of {@link #allSettled} — returns an Awaitable. */
-    @SuppressWarnings("unchecked")
     public static Awaitable<List<AwaitResult<Object>>> 
allSettledAsync(Object... sources) {
-        CompletableFuture<?>[] futures = Arrays.stream(sources)
-                .map(s -> Awaitable.from(s).toCompletableFuture())
-                .toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCombinatorFutures(sources);
         CompletableFuture<List<AwaitResult<Object>>> combined = 
CompletableFuture.allOf(
-                Arrays.stream(futures).map(f -> f.handle((v, e) -> 
null)).toArray(CompletableFuture[]::new)
-        ).thenApply(v -> {
-            List<AwaitResult<Object>> results = new 
ArrayList<>(futures.length);
-            for (CompletableFuture<?> f : futures) {
-                try {
-                    results.add(AwaitResult.success(f.join()));
-                } catch (CompletionException e) {
-                    results.add(AwaitResult.failure(unwrap(e)));
-                } catch (CancellationException e) {
-                    results.add(AwaitResult.failure(e));
-                }
-            }
-            return results;
-        });
+                Arrays.stream(futures)
+                    .map(future -> future.handle((value, error) -> null))
+                    .toArray(CompletableFuture[]::new)
+            )
+                .thenApply(v -> getAwaitResults(futures));
         return GroovyPromise.of(combined);
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T> List<T> getJoinedResults(CompletableFuture<?>[] 
futures) {
+        List<T> results = new ArrayList<>(futures.length);
+        for (CompletableFuture<?> future : futures) {
+            results.add((T) future.join());
+        }
+        return results;
+    }
+
+    private static List<AwaitResult<Object>> 
getAwaitResults(CompletableFuture<?>[] futures) {
+        List<AwaitResult<Object>> results = new ArrayList<>(futures.length);
+        for (CompletableFuture<?> f : futures) {
+            try {
+                results.add(AwaitResult.success(f.join()));
+            } catch (CompletionException e) {
+                results.add(AwaitResult.failure(unwrap(e)));
+            } catch (CancellationException e) {
+                results.add(AwaitResult.failure(e));
+            }
+        }
+        return results;
+    }
+
+    private static CompletableFuture<?>[] toCombinatorFutures(Object... 
sources) {
+        return Arrays.stream(sources)
+                .map(source -> Awaitable.from(source).toCompletableFuture())
+                .toArray(CompletableFuture[]::new);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> CompletableFuture<T>[] 
toFirstCombinatorFutures(Object... sources) {
+        validateFirstSources(sources);
+        return (CompletableFuture<T>[]) toCombinatorFutures(sources);
+    }
+
+    private static void validateFirstSources(Object[] sources) {
+        if (sources == null) {
+            throw new IllegalArgumentException("sources must not be null");
+        }
+        if (sources.length == 0) {
+            throw new IllegalArgumentException("sources must not be empty");
+        }
+        for (Object source : sources) {
+            if (source == null) {
+                throw new IllegalArgumentException("sources must not contain 
null elements");
+            }
+        }
+    }
+
+    private static CompletionException aggregateFirstFailures(int sourceCount, 
List<Throwable> errors) {
+        CompletionException aggregate = new CompletionException(
+                "All " + sourceCount + " tasks failed", errors.get(0));
+        for (int i = 1; i < errors.size(); i++) {
+            aggregate.addSuppressed(errors.get(i));
+        }
+        return aggregate;
+    }
+
     // ---- delay and timeout ----------------------------------------------
 
     /**
diff --git a/src/test/groovy/groovy/AsyncAwaitTest.groovy 
b/src/test/groovy/groovy/AsyncAwaitTest.groovy
index 90b6ef4936..44e884efab 100644
--- a/src/test/groovy/groovy/AsyncAwaitTest.groovy
+++ b/src/test/groovy/groovy/AsyncAwaitTest.groovy
@@ -2142,6 +2142,58 @@ final class AsyncAwaitTest {
         '''
     }
 
+    @Test
+    void testAsyncSupportFirstReturnsFirstSuccess() {
+        assertScript '''
+            import org.apache.groovy.runtime.async.AsyncSupport
+
+            def fail = async { throw new RuntimeException('fail') }
+            def success = async { 'ok' }
+
+            assert AsyncSupport.first(fail, success) == 'ok'
+        '''
+    }
+
+    @Test
+    void testAsyncSupportAllReturnsOrderedResults() {
+        assertScript '''
+            import org.apache.groovy.runtime.async.AsyncSupport
+
+            def slow = async { Thread.sleep(100); 'slow' }
+            def fast = async { 'fast' }
+
+            assert AsyncSupport.all(slow, fast) == ['slow', 'fast']
+        '''
+    }
+
+    @Test
+    void testAsyncSupportAnyReturnsFirstCompletion() {
+        assertScript '''
+            import org.apache.groovy.runtime.async.AsyncSupport
+
+            def slow = async { Thread.sleep(100); 'slow' }
+            def fast = async { 'fast' }
+
+            assert AsyncSupport.any(slow, fast) == 'fast'
+        '''
+    }
+
+    @Test
+    void testAsyncSupportAllSettledReturnsMixedOutcomes() {
+        assertScript '''
+            import org.apache.groovy.runtime.async.AsyncSupport
+
+            def results = AsyncSupport.allSettled(
+                async { 42 },
+                async { throw new RuntimeException('boom') }
+            )
+
+            assert results*.success == [true, false]
+            assert results[0].value == 42
+            assert results[1].error.message == 'boom'
+        '''
+    }
+
     @Test
     void testAwaitableFromAwaitableReturnsSame() {
         assertScript '''
@@ -2326,6 +2378,32 @@ final class AsyncAwaitTest {
         '''
     }
 
+    @Test
+    void testAsyncSupportFirstRejectsEmptySources() {
+        def message = shouldFail(IllegalArgumentException) {
+            assertScript '''
+                import org.apache.groovy.runtime.async.AsyncSupport
+
+                AsyncSupport.first()
+            '''
+        }
+
+        assert message.message == 'sources must not be empty'
+    }
+
+    @Test
+    void testAsyncSupportFirstAsyncRejectsNullSource() {
+        def message = shouldFail(IllegalArgumentException) {
+            assertScript '''
+                import org.apache.groovy.runtime.async.AsyncSupport
+
+                AsyncSupport.firstAsync(async { 'ok' }, null)
+            '''
+        }
+
+        assert message.message == 'sources must not contain null elements'
+    }
+
     @Test
     void testForAwaitWithEmptyList() {
         assertScript '''

Reply via email to