This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit bf0afa5f5e08a71e588c6bdd2398e2b7e76453f1 Author: Daniel Sun <[email protected]> AuthorDate: Sun Oct 26 15:52:59 2025 +0900 GROOVY-9381: Support async/await like ES7(backend) --- src/main/java/groovy/transform/Async.java | 39 + .../groovy/util/concurrent/async/AsyncHelper.java | 96 +++ .../util/concurrent/async/AwaitException.java | 44 + .../groovy/util/concurrent/async/Awaitable.java | 35 + .../java/groovy/util/concurrent/async/Promise.java | 340 ++++++++ .../util/concurrent/async/SimplePromise.java | 516 ++++++++++++ .../util/concurrent/async/StageAwaitable.java | 721 ++++++++++++++++ .../groovy/transform/AsyncASTTransformation.java | 79 ++ .../util/concurrent/async/AsyncHelperTest.groovy | 643 +++++++++++++++ .../util/concurrent/async/SimplePromiseTest.groovy | 906 +++++++++++++++++++++ .../transform/AsyncASTTransformationTest.groovy | 45 + 11 files changed, 3464 insertions(+) diff --git a/src/main/java/groovy/transform/Async.java b/src/main/java/groovy/transform/Async.java new file mode 100644 index 0000000000..0b9cd5809d --- /dev/null +++ b/src/main/java/groovy/transform/Async.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.transform; + +import org.codehaus.groovy.transform.GroovyASTTransformationClass; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation used to mark async methods, closures and lambda expressions. + * + * @since 6.0.0 + */ +@Documented +@Retention(RetentionPolicy.SOURCE) +@Target({ElementType.METHOD, ElementType.LOCAL_VARIABLE, ElementType.FIELD}) +@GroovyASTTransformationClass({"org.codehaus.groovy.transform.AsyncASTTransformation"}) +public @interface Async { +} diff --git a/src/main/java/groovy/util/concurrent/async/AsyncHelper.java b/src/main/java/groovy/util/concurrent/async/AsyncHelper.java new file mode 100644 index 0000000000..428dce309d --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/AsyncHelper.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import org.apache.groovy.util.SystemUtil; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Supplier; + +/** + * Helper class for async/await operations + * + * @since 6.0.0 + */ +public class AsyncHelper { + private static final int PARALLELISM = SystemUtil.getIntegerSafe("groovy.async.parallelism", Runtime.getRuntime().availableProcessors() + 1); + private static final Executor DEFAULT_EXECUTOR; + private static int seq; + + static { + Executor tmpExecutor; + try { + MethodHandle mh = MethodHandles.lookup().findStatic( + Executors.class, + "newVirtualThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class) + ); + tmpExecutor = (Executor) mh.invoke(); + } catch (Throwable throwable) { + // Fallback to default thread pool if virtual threads are not available + tmpExecutor = Executors.newFixedThreadPool(PARALLELISM, r -> { + Thread t = new Thread(r); + t.setName("async-thread-" + seq++); + return t; + }); + } + DEFAULT_EXECUTOR = tmpExecutor; + } + + /** + * Submits a supplier for asynchronous execution using the default executor + * + * @param supplier the supplier + * @param <T> the result type + * @return the promise + */ + public static <T> Promise<T> async(Supplier<T> supplier) { + return SimplePromise.of(supplier, DEFAULT_EXECUTOR); + } + + /** + * Submits a supplier for asynchronous execution using the provided executor + * + * @param supplier the supplier + * @param executor the executor + * @param <T> the result type + * @return the promise + */ + public static <T> Promise<T> async(Supplier<T> supplier, Executor executor) { + return SimplePromise.of(supplier, executor); + } + + /** + * Awaits the result of an awaitable + * + * @param awaitable the awaitable + * @param <T> the result type + * @return the result + */ + public static <T> T await(Awaitable<T> awaitable) { + return awaitable.await(); + } + + private AsyncHelper() {} +} diff --git a/src/main/java/groovy/util/concurrent/async/AwaitException.java b/src/main/java/groovy/util/concurrent/async/AwaitException.java new file mode 100644 index 0000000000..2099758181 --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/AwaitException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +/** + * Exception thrown when attempting to wait for the result of a promise + * that aborted by throwing an exception. This exception can be + * inspected using the {@link #getCause()} method. + * + * @see Promise + * @since 6.0.0 + */ +public class AwaitException extends RuntimeException { + public AwaitException() { + } + + public AwaitException(String message) { + super(message); + } + + public AwaitException(Throwable cause) { + super(cause); + } + + public AwaitException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/groovy/util/concurrent/async/Awaitable.java b/src/main/java/groovy/util/concurrent/async/Awaitable.java new file mode 100644 index 0000000000..73c0df6fe1 --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/Awaitable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +/** + * Represents a result of an asynchronous computation + * + * @since 6.0.0 + */ +public interface Awaitable<T> { + /** + * Waits if necessary for the computation to complete, and then retrieves its + * result. + * + * @return the computed result + * @throws AwaitException if the computation was cancelled or completed + */ + T await(); +} diff --git a/src/main/java/groovy/util/concurrent/async/Promise.java b/src/main/java/groovy/util/concurrent/async/Promise.java new file mode 100644 index 0000000000..ee0964b7fa --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/Promise.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Represents the result of an asynchronous computation that may be explicitly completed (setting its + * value and status), and may be used as a {@link StageAwaitable}, + * supporting dependent functions and actions that trigger upon its + * completion. + * + * @since 6.0.0 + */ +public interface Promise<T> extends StageAwaitable<T>, Future<T> { + /** + * Causes invocations of {@link #get()} and related methods to throw the provided + * exception if not already completed. + * + * @param ex the exception + * @return {@code true} if this invocation caused this Promise to transition to a + * completed state, else {@code false} + */ + boolean completeExceptionally(Throwable ex); + + /** + * Forcibly causes subsequent invocations of {@link #get()} and related methods + * to throw the provided exception, regardless of whether already completed. + * This method is intended for use only in error recovery scenarios and may cause + * ongoing dependent completions to use established outcomes instead of the + * overwritten outcome even in such situations. + * + * @param ex the exception + * @throws NullPointerException if the exception is null + */ + void obtrudeException(Throwable ex); + + /** + * Completes this Promise exceptionally with a {@link TimeoutException} if not + * otherwise completed before the specified timeout. + * + * @param timeout the duration to wait before completing exceptionally with a + * TimeoutException, measured in units of {@code unit} + * @param unit the {@code TimeUnit} that determines how to interpret the + * {@code timeout} parameter + * @return this Promise + */ + Promise<T> orTimeout(long timeout, TimeUnit unit); + + /** + * Returns the result value when complete, or throws an unchecked exception if + * completed exceptionally. To better conform with the use of common functional + * forms, if a computation involved in the completion of this Promise threw an + * exception, this method throws an unchecked {@link CompletionException} with + * the underlying exception as its cause. + * + * @return the result value + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed exceptionally or a + * completion computation threw an exception + */ + T join(); + + /** + * Returns the default Executor used for async methods that do not specify an + * Executor. + * + * @return the executor + */ + Executor defaultExecutor(); + + /** + * Completes this Promise with the provided value if not otherwise completed + * before the specified timeout. + * + * @param value the value to use upon timeout + * @param timeout the duration to wait before completing normally with the + * provided value, measured in units of {@code unit} + * @param unit the {@code TimeUnit} that determines how to interpret the + * {@code timeout} parameter + * @return this Promise + */ + Promise<T> completeOnTimeout(T value, long timeout, TimeUnit unit); + + /** + * If not already completed, sets the value returned by {@link #get()} and + * related methods to the provided value. + * + * @param value the result value + * @return {@code true} if this invocation caused this Promise to transition + * to a completed state, else {@code false} + */ + boolean complete(T value); + + /** + * Returns the estimated number of Promises whose completions are awaiting + * completion of this Promise. This method is designed for use in monitoring + * system state, not for synchronization control. + * + * @return the number of dependent Promises + */ + int getNumberOfDependents(); + + /** + * Returns {@code true} if this Promise completed exceptionally, in any way. + * Possible causes include cancellation, explicit invocation of + * {@code completeExceptionally}, and abrupt termination of a CompletionStage + * action. + * + * @return {@code true} if this Promise completed exceptionally + */ + boolean isCompletedExceptionally(); + + /** + * Completes this Promise with the result of the provided Supplier function + * invoked from an asynchronous task using the specified executor. + * + * @param supplier a function returning the value to be used to complete this + * Promise + * @param executor the executor to use for asynchronous execution + * @return this Promise + */ + Promise<T> completeAsync(Supplier<? extends T> supplier, Executor executor); + + /** + * Forcibly sets or resets the value subsequently returned by method + * {@link #get()} and related methods, regardless of whether already + * completed. This method is designed for use only in error recovery actions, + * and even in such situations may result in ongoing dependent completions + * using established versus overwritten outcomes. + * + * @param value the completion value + */ + void obtrudeValue(T value); + + /** + * Returns a new Promise that is completed normally with the same value as + * this Promise when it completes normally. If this Promise completes + * exceptionally, then the returned Promise completes exceptionally with a + * CompletionException with this exception as cause. The behavior is + * equivalent to {@code thenApply(x -> x)}. This method may be useful as a + * form of "defensive copying", to prevent clients from completing, while + * still being able to arrange dependent actions. + * + * @return the new Promise + */ + Promise<T> copy(); + + /** + * Completes this Promise with the result of the provided Supplier function + * invoked from an asynchronous task using the default executor. + * + * @param supplier a function returning the value to be used to complete this + * Promise + * @return this Promise + */ + Promise<T> completeAsync(Supplier<? extends T> supplier); + + /** + * Returns the result value (or throws any encountered exception) if + * completed, else returns the provided valueIfAbsent. + * + * @param valueIfAbsent the value to return if not completed + * @return the result value, if completed, else the provided valueIfAbsent + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed exceptionally or a + * completion computation threw an exception + */ + T getNow(T valueIfAbsent); + + /** + * Returns a {@link CompletableFuture} representation of the object. + * + * @return the CompletableFuture + */ + CompletableFuture<T> toCompletableFuture(); + + @Override + <U> Promise<U> thenApply(Function<? super T, ? extends U> fn); + + @Override + <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn); + + @Override + <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor); + + @Override + Promise<Void> thenAccept(Consumer<? super T> action); + + @Override + Promise<Void> thenAcceptAsync(Consumer<? super T> action); + + @Override + Promise<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); + + @Override + Promise<Void> thenRun(Runnable action); + + @Override + Promise<Void> thenRunAsync(Runnable action); + + @Override + Promise<Void> thenRunAsync(Runnable action, Executor executor); + + @Override + <U, V> Promise<V> thenCombine(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn); + + @Override + <U, V> Promise<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn); + + @Override + <U, V> Promise<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor); + + @Override + <U> Promise<Void> thenAcceptBoth(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action); + + @Override + <U> Promise<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action); + + @Override + <U> Promise<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); + + @Override + Promise<Void> runAfterBoth(StageAwaitable<?> other, Runnable action); + + @Override + Promise<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action); + + @Override + Promise<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action, Executor executor); + + @Override + <U> Promise<U> applyToEither(StageAwaitable<? extends T> other, Function<? super T, U> fn); + + @Override + <U> Promise<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn); + + @Override + <U> Promise<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn, Executor executor); + + @Override + Promise<Void> acceptEither(StageAwaitable<? extends T> other, Consumer<? super T> action); + + @Override + Promise<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action); + + @Override + Promise<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action, Executor executor); + + @Override + Promise<Void> runAfterEither(StageAwaitable<?> other, Runnable action); + + @Override + Promise<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action); + + @Override + Promise<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action, Executor executor); + + @Override + <U> Promise<U> thenCompose(Function<? super T, ? extends StageAwaitable<U>> fn); + + @Override + <U> Promise<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn); + + @Override + <U> Promise<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn, Executor executor); + + @Override + <U> Promise<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); + + @Override + <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); + + @Override + <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor); + + @Override + Promise<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); + + @Override + Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); + + @Override + Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); + + @Override + Promise<T> exceptionally(Function<Throwable, ? extends T> fn); + + @Override + default Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) { + return StageAwaitable.super.exceptionallyAsync(fn).toPromise(); + } + + @Override + default Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) { + return StageAwaitable.super.exceptionallyAsync(fn, executor).toPromise(); + } + + @Override + default Promise<T> exceptionallyCompose(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return StageAwaitable.super.exceptionallyCompose(fn).toPromise(); + } + + @Override + default Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return StageAwaitable.super.exceptionallyComposeAsync(fn).toPromise(); + } + + @Override + default Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn, Executor executor) { + return StageAwaitable.super.exceptionallyComposeAsync(fn, executor).toPromise(); + } +} diff --git a/src/main/java/groovy/util/concurrent/async/SimplePromise.java b/src/main/java/groovy/util/concurrent/async/SimplePromise.java new file mode 100644 index 0000000000..e1739765a2 --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/SimplePromise.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * A simple implementation of {@link Promise} based on {@link CompletableFuture}. + * + * @since 6.0.0 + */ +public class SimplePromise<T> implements Promise<T> { + private final CompletableFuture<T> future; + + private SimplePromise(CompletableFuture<T> future) { + this.future = future; + } + + /** + * Creates a new Promise backed by the given CompletableFuture. + * + * @param future the CompletableFuture to back the Promise + * @param <T> the type of the Promise's result + * @return the new Promise + */ + public static <T> SimplePromise<T> of(CompletableFuture<T> future) { + return new SimplePromise<>(future); + } + + /** + * Returns a new Promise that is not yet completed. + * + * @param <T> the type of the Promise's result + * @return the new Promise + */ + public static <T> SimplePromise<T> of() { + return of(new CompletableFuture<>()); + } + + /** + * Returns a new Promise that is asynchronously completed by a task running in + * the {@link ForkJoinPool#commonPool()} with the value obtained by calling the + * provided Supplier. + * + * @param supplier a function returning the value to be used to complete the + * returned Promise + * @param <U> the function's return type + * @return the new Promise + */ + public static <U> SimplePromise<U> of(Supplier<U> supplier) { + return of(CompletableFuture.supplyAsync(supplier)); + } + + /** + * Returns a new Promise that is asynchronously completed by a task running in + * the provided executor with the value obtained by calling the provided Supplier. + * + * @param supplier a function returning the value to be used to complete the + * returned Promise + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new Promise + */ + public static <U> SimplePromise<U> of(Supplier<U> supplier, Executor executor) { + return of(CompletableFuture.supplyAsync(supplier, executor)); + } + +// /** +// * Returns a new Promise that is asynchronously completed by a task running in +// * the {@link ForkJoinPool#commonPool()} after it runs the provided action. +// * +// * @param runnable the action to run before completing the returned Promise +// * @return the new Promise +// */ +// public static SimplePromise<Void> of(Runnable runnable) { +// return of(CompletableFuture.runAsync(runnable)); +// } +// +// /** +// * Returns a new Promise that is asynchronously completed by a task running in +// * the provided executor after it runs the provided action. +// * +// * @param runnable the action to run before completing the returned Promise +// * @param executor the executor to use for asynchronous execution +// * @return the new Promise +// */ +// public static SimplePromise<Void> of(Runnable runnable, Executor executor) { +// return of(CompletableFuture.runAsync(runnable, executor)); +// } + + /** + * Returns a new Promise that is already completed with the provided value. + * + * @param value the value + * @param <U> the type of the value + * @return the completed Promise + */ + public static <U> SimplePromise<U> completedPromise(U value) { + return of(CompletableFuture.completedFuture(value)); + } + + /** + * Returns a new Promise that is completed when all of the provided Promises + * complete. If any of the provided Promises complete exceptionally, then the + * returned Promise also does so, with a CompletionException holding this + * exception as its cause. Otherwise, the results, if any, of the provided + * Promises are not reflected in the returned Promise, but may be obtained by + * inspecting them individually. If no Promises are provided, returns a Promise + * completed with the value {@code null}. + * + * <p>Among the applications of this method is to await completion of a set of + * independent Promises before continuing a program, as in: + * {@code SimplePromise.allOf(p1, p2, p3).join();}. + * + * @param ps the Promises + * @return a new Promise that is completed when all of the provided Promises complete + * @throws NullPointerException if the array or any of its elements are {@code null} + */ + public static SimplePromise<Void> allOf(Promise<?>... ps) { + return of(CompletableFuture.allOf( + Arrays.stream(ps) + .map(Promise::toCompletableFuture) + .toArray(CompletableFuture[]::new) + )); + } + + /** + * Returns a new Promise that is completed when any of the provided Promises + * complete, with the same result. Otherwise, if it completed exceptionally, + * the returned Promise also does so, with a CompletionException holding this + * exception as its cause. If no Promises are provided, returns an incomplete + * Promise. + * + * @param ps the Promises + * @return a new Promise that is completed with the result or exception of any + * of the provided Promises when one completes + * @throws NullPointerException if the array or any of its elements are + * {@code null} + */ + public static SimplePromise<Object> anyOf(Promise<?>... ps) { + return of(CompletableFuture.anyOf( + Arrays.stream(ps) + .map(Promise::toCompletableFuture) + .toArray(CompletableFuture[]::new) + )); + } + + @Override + public T await() { + try { + return this.join(); + } catch (Throwable t) { + throw new AwaitException(t); + } + } + + @Override + public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) { + return of(future.whenCompleteAsync(action, executor)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return future.completeExceptionally(ex); + } + + @Override + public Promise<Void> thenRun(Runnable action) { + return of(future.thenRun(action)); + } + + @Override + public <U> Promise<U> applyToEither(StageAwaitable<? extends T> other, Function<? super T, U> fn) { + return of(future.applyToEither(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public void obtrudeException(Throwable ex) { + future.obtrudeException(ex); + } + + @Override + public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return of(future.exceptionallyComposeAsync(t -> { + final StageAwaitable<T> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public Promise<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { + return of(future.whenComplete(action)); + } + + @Override + public <U> Promise<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn, Executor executor) { + return of(future.applyToEitherAsync(other.toPromise().toCompletableFuture(), fn, executor)); + } + + @Override + public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn) { + return of(future.thenApplyAsync(fn)); + } + + @Override + public <U> Promise<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action) { + return of(future.thenAcceptBothAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) { + return of(future.exceptionallyAsync(fn, executor)); + } + + @Override + public Promise<Void> thenRunAsync(Runnable action, Executor executor) { + return of(future.thenRunAsync(action, executor)); + } + + @Override + public Promise<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterEitherAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<T> orTimeout(long timeout, TimeUnit unit) { + return of(future.orTimeout(timeout, unit)); + } + + @Override + public <U, V> Promise<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) { + return of(future.thenCombineAsync(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public Promise<Void> runAfterBoth(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterBoth(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public <U> Promise<U> thenCompose(Function<? super T, ? extends StageAwaitable<U>> fn) { + return of(future.thenCompose(t -> { + final StageAwaitable<U> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public Promise<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action, Executor executor) { + return of(future.runAfterBothAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) { + return of(future.handleAsync(fn)); + } + + @Override + public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn, Executor executor) { + return of(future.thenComposeAsync(t -> { + final StageAwaitable<U> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + }, executor)); + } + + @Override + public Promise<Void> thenAccept(Consumer<? super T> action) { + return of(future.thenAccept(action)); + } + + @Override + public T join() { + return future.join(); + } + + @Override + public Promise<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action) { + return of(future.acceptEitherAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Executor defaultExecutor() { + return future.defaultExecutor(); + } + + @Override + public Promise<T> exceptionallyCompose(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return of(future.exceptionallyCompose(t -> { + final StageAwaitable<T> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public <U> Promise<Void> thenAcceptBoth(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action) { + return of(future.thenAcceptBoth(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<Void> runAfterEither(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterEither(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<T> completeOnTimeout(T value, long timeout, TimeUnit unit) { + return of(future.completeOnTimeout(value, timeout, unit)); + } + + @Override + public <U> Promise<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) { + return of(future.handle(fn)); + } + + @Override + public boolean complete(T value) { + return future.complete(value); + } + + @Override + public Promise<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { + return of(future.thenAcceptAsync(action, executor)); + } + + @Override + public int getNumberOfDependents() { + return future.getNumberOfDependents(); + } + + @Override + public <U> Promise<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) { + return of(future.thenAcceptBothAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) { + return of(future.exceptionallyAsync(fn)); + } + + @Override + public Promise<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action, Executor executor) { + return of(future.runAfterEitherAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public boolean isCompletedExceptionally() { + return future.isCompletedExceptionally(); + } + + @Override + public Promise<T> completeAsync(Supplier<? extends T> supplier) { + return of(future.completeAsync(supplier)); + } + + @Override + public <U> Promise<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn) { + return of(future.applyToEitherAsync(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { + return of(future.whenCompleteAsync(action)); + } + + @Override + public Promise<Void> thenRunAsync(Runnable action) { + return of(future.thenRunAsync(action)); + } + + @Override + public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn) { + return of(future.thenApply(fn)); + } + + @Override + public void obtrudeValue(T value) { + future.obtrudeValue(value); + } + + @Override + public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn) { + return of(future.thenComposeAsync(t -> { + final StageAwaitable<U> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public Promise<T> copy() { + return of(future.copy()); + } + + @Override + public Promise<Void> acceptEither(StageAwaitable<? extends T> other, Consumer<? super T> action) { + return of(future.acceptEither(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) { + return of(future.thenApplyAsync(fn, executor)); + } + + @Override + public <U, V> Promise<V> thenCombine(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) { + return of(future.thenCombine(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public Promise<T> exceptionally(Function<Throwable, ? extends T> fn) { + return of(future.exceptionally(fn)); + } + + @Override + public Promise<T> completeAsync(Supplier<? extends T> supplier, Executor executor) { + return of(future.completeAsync(supplier, executor)); + } + + @Override + public Promise<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action, Executor executor) { + return of(future.acceptEitherAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn, Executor executor) { + return of(future.exceptionallyComposeAsync(t -> { + final StageAwaitable<T> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + }, executor)); + } + + @Override + public Promise<T> toPromise() { + return this; + } + + @Override + public Promise<Void> thenAcceptAsync(Consumer<? super T> action) { + return of(future.thenAcceptAsync(action)); + } + + @Override + public <U, V> Promise<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) { + return of(future.thenCombineAsync(other.toPromise().toCompletableFuture(), fn, executor)); + } + + @Override + public Promise<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterBothAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { + return of(future.handleAsync(fn, executor)); + } + + @Override + public T getNow(T valueIfAbsent) { + return future.getNow(valueIfAbsent); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + + @Override + public CompletableFuture<T> toCompletableFuture() { + return future.toCompletableFuture(); + } +} diff --git a/src/main/java/groovy/util/concurrent/async/StageAwaitable.java b/src/main/java/groovy/util/concurrent/async/StageAwaitable.java new file mode 100644 index 0000000000..59da4e1b0d --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/StageAwaitable.java @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Represents a computation stage in a potentially asynchronous execution chain that + * executes an action or computes a value upon completion of another StageAwaitable. + * A stage completes when its computation finishes, which may subsequently trigger + * the execution of dependent stages in the chain. + * + * @since 6.0.0 + */ +public interface StageAwaitable<T> extends Awaitable<T> { + /** + * Creates a new StageAwaitable that executes the provided function with this stage's + * result as input when this stage completes successfully. + * + * <p>This method follows the same pattern as {@link java.util.Optional#map Optional.map} + * and {@link java.util.stream.Stream#map Stream.map}. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute the resulting StageAwaitable's value + * @param <U> the return type of the function + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenApply(Function<? super T, ? extends U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using this stage's default asynchronous execution facility when this stage + * completes successfully. The function receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute the resulting StageAwaitable's value + * @param <U> the return type of the function + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenApplyAsync(Function<? super T, ? extends U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using the specified Executor when this stage completes successfully. The function + * receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute the resulting StageAwaitable's value + * @param executor the executor used for asynchronous execution + * @param <U> the return type of the function + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action with this stage's + * result as input when this stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenAccept(Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when this stage + * completes successfully. The action receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenAcceptAsync(Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified Executor when this stage completes successfully. The action + * receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action when this stage + * completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenRun(Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when this stage + * completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenRunAsync(Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified Executor when this stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenRunAsync(Runnable action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided function with the results + * of both this stage and the other stage when they both complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @param <V> the function's return type + * @return the newly created StageAwaitable + */ + <U, V> StageAwaitable<V> thenCombine(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using this stage's default asynchronous execution facility when both this stage + * and the other stage complete successfully. The function receives both results as + * arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @param <V> the function's return type + * @return the newly created StageAwaitable + */ + <U, V> StageAwaitable<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using the specified executor when both this stage and the other stage complete + * successfully. The function receives both results as arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the type of the other StageAwaitable's result + * @param <V> the function's return type + * @return the newly created StageAwaitable + */ + <U, V> StageAwaitable<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action with the results + * of both this stage and the other stage when they both complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<Void> thenAcceptBoth(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when both this stage + * and the other stage complete successfully. The action receives both results as + * arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor when both this stage and the other stage complete + * successfully. The action receives both results as arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the type of the other StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action when both this + * stage and the other stage complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterBoth(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when both this stage + * and the other stage complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor when both this stage and the other stage complete + * successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided function with the result + * from whichever stage completes successfully first (either this stage or the other + * stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> applyToEither(StageAwaitable<? extends T> other, Function<? super T, U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using this stage's default asynchronous execution facility with the result from + * whichever stage completes successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using the specified executor with the result from whichever stage completes + * successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action with the result + * from whichever stage completes successfully first (either this stage or the other + * stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> acceptEither(StageAwaitable<? extends T> other, Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility with the result from + * whichever stage completes successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor with the result from whichever stage completes + * successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action when either + * this stage or the other stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterEither(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when either this + * stage or the other stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor when either this stage or the other stage completes + * successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action, Executor executor); + + /** + * Creates a new StageAwaitable that is completed with the same value as the + * StageAwaitable returned by the provided function. + * + * <p>When this stage completes successfully, the provided function is invoked + * with this stage's result as the argument, returning another StageAwaitable. + * When that stage completes successfully, the StageAwaitable returned by this + * method is completed with the same value. + * + * <p>To ensure progress, the supplied function must arrange eventual completion + * of its result. + * + * <p>This method is analogous to {@link java.util.Optional#flatMap Optional.flatMap} + * and {@link java.util.stream.Stream#flatMap Stream.flatMap}. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute another StageAwaitable + * @param <U> the type of the resulting StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenCompose(Function<? super T, ? extends StageAwaitable<U>> fn); + + /** + * Creates a new StageAwaitable that is completed with the same value as the + * StageAwaitable returned by the provided function, executed asynchronously + * using this stage's default asynchronous execution facility. + * + * <p>When this stage completes successfully, the provided function is invoked + * with this stage's result as the argument, returning another StageAwaitable. + * When that stage completes successfully, the StageAwaitable returned by this + * method is completed with the same value. + * + * <p>To ensure progress, the supplied function must arrange eventual completion + * of its result. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute another StageAwaitable + * @param <U> the type of the resulting StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn); + + /** + * Creates a new StageAwaitable that is completed with the same value as the + * StageAwaitable returned by the provided function, executed asynchronously + * using the specified Executor. + * + * <p>When this stage completes successfully, the provided function is invoked + * with this stage's result as the argument, returning another StageAwaitable. + * When that stage completes successfully, the StageAwaitable returned by this + * method is completed with the same value. + * + * <p>To ensure progress, the supplied function must arrange eventual completion + * of its result. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute another StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the type of the resulting StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn, Executor executor); + + /** + * Creates a new StageAwaitable that is executed with this stage's result and + * exception as arguments to the provided function when this stage completes + * either successfully or exceptionally. + * + * <p>When this stage is complete, the provided function is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} if none) + * of this stage as arguments, and the function's result is used to complete the + * resulting stage. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Creates a new StageAwaitable that is executed asynchronously using this stage's + * default asynchronous execution facility with this stage's result and exception + * as arguments to the provided function when this stage completes either + * successfully or exceptionally. + * + * <p>When this stage is complete, the provided function is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} if none) + * of this stage as arguments, and the function's result is used to complete the + * resulting stage. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Creates a new StageAwaitable that is executed asynchronously using the + * specified executor with this stage's result and exception as arguments to + * the provided function when this stage completes either successfully or + * exceptionally. + * + * <p>When this stage is complete, the provided function is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} if none) + * of this stage as arguments, and the function's result is used to complete the + * resulting stage. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor); + + /** + * Creates a new StageAwaitable with the same result or exception as this stage, + * that executes the provided action when this stage completes. + * + * <p>When this stage is complete, the provided action is invoked with the result + * (or {@code null} if none) and the exception (or {@code null} if none) of this + * stage as arguments. The resulting stage is completed when the action returns. + * + * <p>Unlike method {@link #handle handle}, this method is not designed to + * translate completion outcomes, so the supplied action should not throw an + * exception. However, if it does, the following rules apply: if this stage + * completed successfully but the supplied action throws an exception, then the + * resulting stage completes exceptionally with the supplied action's exception. + * Or, if this stage completed exceptionally and the supplied action throws an + * exception, then the resulting stage completes exceptionally with this stage's + * exception. + * + * @param action the action to perform + * @return the newly created StageAwaitable + */ + StageAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); + + /** + * Creates a new StageAwaitable with the same result or exception as this stage, + * that executes the provided action asynchronously using this stage's default + * asynchronous execution facility when this stage completes. + * + * <p>When this stage is complete, the provided action is invoked with the result + * (or {@code null} if none) and the exception (or {@code null} if none) of this + * stage as arguments. The resulting stage is completed when the action returns. + * + * <p>Unlike method {@link #handleAsync(BiFunction) handleAsync}, this method is + * not designed to translate completion outcomes, so the supplied action should + * not throw an exception. However, if it does, the following rules apply: If + * this stage completed successfully but the supplied action throws an exception, + * then the resulting stage completes exceptionally with the supplied action's + * exception. Or, if this stage completed exceptionally and the supplied action + * throws an exception, then the resulting stage completes exceptionally with + * this stage's exception. + * + * @param action the action to perform + * @return the newly created StageAwaitable + */ + StageAwaitable<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); + + /** + * Creates a new StageAwaitable with the same result or exception as this stage, + * that executes the provided action asynchronously using the specified Executor + * when this stage completes. + * + * <p>When this stage is complete, the provided action is invoked with the result + * (or {@code null} if none) and the exception (or {@code null} if none) of this + * stage as arguments. The resulting stage is completed when the action returns. + * + * <p>Unlike method {@link #handleAsync(BiFunction,Executor) handleAsync}, this + * method is not designed to translate completion outcomes, so the supplied action + * should not throw an exception. However, if it does, the following rules apply: + * If this stage completed successfully but the supplied action throws an + * exception, then the resulting stage completes exceptionally with the supplied + * action's exception. Or, if this stage completed exceptionally and the supplied + * action throws an exception, then the resulting stage completes exceptionally + * with this stage's exception. + * + * @param action the action to perform + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); + + /** + * Creates a new StageAwaitable that is executed with this stage's exception as + * the argument to the provided function when this stage completes exceptionally. + * Otherwise, if this stage completes successfully, then the resulting stage also + * completes successfully with the same value. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * if this StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + StageAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn); + + /** + * Creates a new StageAwaitable that is executed asynchronously with this stage's + * exception as the argument to the provided function using this stage's default + * asynchronous execution facility when this stage completes exceptionally. + * Otherwise, if this stage completes successfully, then the resulting stage also + * completes successfully with the same value. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * if this StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) { + return handle((r, ex) -> (ex == null) ? this : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1))).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is executed asynchronously with this stage's + * exception as the argument to the provided function using the specified Executor + * when this stage completes exceptionally. Otherwise, if this stage completes + * successfully, then the resulting stage also completes successfully with the + * same value. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * if this StageAwaitable completed exceptionally + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) { + return handle((r, ex) -> (ex == null) ? this : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1), executor)).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is composed using the result of the provided + * function applied to this stage's exception when this stage completes exceptionally. + * + * @implSpec The default implementation invokes {@link #handle}, invoking the + * provided function on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the resulting StageAwaitable if this + * StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyCompose(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return handle((r, ex) -> (ex == null) ? this : fn.apply(ex)).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is composed asynchronously using the result + * of the provided function applied to this stage's exception using this stage's + * default asynchronous execution facility when this stage completes exceptionally. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the resulting StageAwaitable if this + * StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return handle((r, ex) -> (ex == null) ? this : this.handleAsync((r1, ex1) -> fn.apply(ex1)).thenCompose(Function.identity())).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is composed asynchronously using the result + * of the provided function applied to this stage's exception using the specified + * Executor when this stage completes exceptionally. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the resulting StageAwaitable if this + * StageAwaitable completed exceptionally + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn, Executor executor) { + return handle((r, ex) -> (ex == null) ? this : this.handleAsync((r1, ex1) -> fn.apply(ex1), executor).thenCompose(Function.identity())).thenCompose(Function.identity()); + } + + /** + * Creates a {@link Promise} that maintains the same completion properties + * as this stage. If this stage is already a Promise, this method may return + * this stage itself. Otherwise, invoking this method may have the same effect as + * {@code thenApply(x -> x)}, but returns an instance of type {@code Promise}. + * + * @return the Promise + */ + Promise<T> toPromise(); +} diff --git a/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java new file mode 100644 index 0000000000..ab51adff0b --- /dev/null +++ b/src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.codehaus.groovy.transform; + +import groovy.transform.Async; +import groovy.util.concurrent.async.AsyncHelper; +import groovy.util.concurrent.async.Promise; +import org.codehaus.groovy.ast.ASTNode; +import org.codehaus.groovy.ast.AnnotatedNode; +import org.codehaus.groovy.ast.AnnotationNode; +import org.codehaus.groovy.ast.ClassNode; +import org.codehaus.groovy.ast.GenericsType; +import org.codehaus.groovy.ast.MethodNode; +import org.codehaus.groovy.ast.expr.Expression; +import org.codehaus.groovy.ast.expr.LambdaExpression; +import org.codehaus.groovy.ast.stmt.BlockStatement; +import org.codehaus.groovy.ast.tools.GeneralUtils; +import org.codehaus.groovy.ast.tools.GenericsUtils; +import org.codehaus.groovy.classgen.VariableScopeVisitor; +import org.codehaus.groovy.control.CompilePhase; +import org.codehaus.groovy.control.SourceUnit; + +import static org.codehaus.groovy.ast.ClassHelper.make; +import static org.codehaus.groovy.ast.ClassHelper.makeWithoutCaching; +import static org.codehaus.groovy.ast.tools.GeneralUtils.block; +import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt; + +/** + * Handles generation of code for the {@link Async} annotation. + */ +@GroovyASTTransformation(phase = CompilePhase.SEMANTIC_ANALYSIS) +public class AsyncASTTransformation extends AbstractASTTransformation { + private static final ClassNode MY_TYPE = make(Async.class); + private static final String MY_TYPE_NAME = "@" + MY_TYPE.getNameWithoutPackage(); + private static final ClassNode ASYNC_HELPER_TYPE = make(AsyncHelper.class); + + @Override + public void visit(ASTNode[] nodes, SourceUnit source) { + init(nodes, source); + AnnotationNode annotationNode = (AnnotationNode) nodes[0]; + AnnotatedNode annotatedNode = (AnnotatedNode) nodes[1]; + + if (MY_TYPE.equals(annotationNode.getClassNode()) && annotatedNode instanceof MethodNode) { + MethodNode methodNode = (MethodNode) annotatedNode; + if (methodNode.isAbstract()) { + addError("Annotation " + MY_TYPE_NAME + " cannot be used for abstract methods.", methodNode); + return; + } + + BlockStatement origCode = (BlockStatement) methodNode.getCode(); + LambdaExpression supplierLambdaExpression = GeneralUtils.lambdaX(origCode); + Expression resultExpression = GeneralUtils.callX(ASYNC_HELPER_TYPE, "async", supplierLambdaExpression); + + BlockStatement newCode = block(stmt(resultExpression)); + newCode.setSourcePosition(origCode); + methodNode.setCode(newCode); + ClassNode promiseType = GenericsUtils.makeClassSafeWithGenerics(makeWithoutCaching(Promise.class), new GenericsType(methodNode.getReturnType())); + methodNode.setReturnType(promiseType); + VariableScopeVisitor variableScopeVisitor = new VariableScopeVisitor(sourceUnit); + variableScopeVisitor.visitClass(methodNode.getDeclaringClass()); + } + } +} diff --git a/src/test/groovy/groovy/util/concurrent/async/AsyncHelperTest.groovy b/src/test/groovy/groovy/util/concurrent/async/AsyncHelperTest.groovy new file mode 100644 index 0000000000..20a288ad42 --- /dev/null +++ b/src/test/groovy/groovy/util/concurrent/async/AsyncHelperTest.groovy @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async + +import groovy.transform.CompileStatic +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +import static groovy.util.concurrent.async.AsyncHelper.async +import static groovy.util.concurrent.async.AsyncHelper.await +import static org.junit.jupiter.api.Assertions.assertEquals +import static org.junit.jupiter.api.Assertions.assertFalse +import static org.junit.jupiter.api.Assertions.assertNotNull +import static org.junit.jupiter.api.Assertions.assertNull +import static org.junit.jupiter.api.Assertions.assertThrows +import static org.junit.jupiter.api.Assertions.assertTrue + +@CompileStatic +@DisplayName("AsyncHelper Tests") +class AsyncHelperTest { + + @Nested + @DisplayName("Basic async/await operations") + class BasicOperationsTest { + + @Test + @DisplayName("should execute simple async operation") + void testSimpleAsync() { + Promise<Integer> promise = async(() -> 42) + Integer result = await(promise) + assertEquals(42, result) + } + + @Test + @DisplayName("should execute async operation with string") + void testAsyncWithString() { + Promise<String> promise = async(() -> "Hello, Async!") + String result = await(promise) + assertEquals("Hello, Async!", result) + } + + @Test + @DisplayName("should handle async operation returning null") + void testAsyncReturningNull() { + Promise<Object> promise = async(() -> null) + Object result = await(promise) + assertNull(result) + } + + @Test + @DisplayName("should execute multiple async operations") + void testMultipleAsyncOperations() { + Promise<Integer> p1 = async(() -> 10) + Promise<Integer> p2 = async(() -> 20) + Promise<Integer> p3 = async(() -> 30) + + assertEquals(10, await(p1)) + assertEquals(20, await(p2)) + assertEquals(30, await(p3)) + } + + @Test + @DisplayName("should handle computation in async") + void testAsyncComputation() { + Promise<Integer> promise = async(() -> { + int sum = 0 + for (int i = 1; i <= 100; i++) { + sum += i + } + return sum + }) + assertEquals(5050, await(promise)) + } + } + + @Nested + @DisplayName("Custom executor operations") + class CustomExecutorTest { + private ExecutorService customExecutor + + @AfterEach + void cleanup() { + if (customExecutor != null) { + customExecutor.shutdown() + } + } + + @Test + @DisplayName("should use custom executor for async operation") + void testAsyncWithCustomExecutor() { + customExecutor = Executors.newSingleThreadExecutor() + AtomicReference<String> threadName = new AtomicReference<>() + + Promise<Integer> promise = async(() -> { + threadName.set(Thread.currentThread().getName()) + return 100 + }, customExecutor) + + assertEquals(100, await(promise)) + assertNotNull(threadName.get()) + } + + @Test + @DisplayName("should handle multiple operations with custom executor") + void testMultipleOperationsWithCustomExecutor() { + customExecutor = Executors.newFixedThreadPool(2) + + Promise<Integer> p1 = async(() -> 1, customExecutor) + Promise<Integer> p2 = async(() -> 2, customExecutor) + Promise<Integer> p3 = async(() -> 3, customExecutor) + + assertEquals(6, await(p1) + await(p2) + await(p3)) + } + + @Test + @DisplayName("should work with cached thread pool") + void testWithCachedThreadPool() { + customExecutor = Executors.newCachedThreadPool() + List<Promise<Integer>> promises = new ArrayList<>() + + for (int i = 0; i < 10; i++) { + final int value = i + promises.add(async(() -> value * 2, customExecutor)) + } + + int sum = 0 + for (Promise<Integer> p : promises) { + sum += await(p) + } + assertEquals(90, sum) + } + } + + @Nested + @DisplayName("JavaScript-like async/await patterns") + class JavaScriptPatternsTest { + + @Test + @DisplayName("should chain async operations like Promise.then()") + void testChainedAsync() { + Promise<Integer> result = async(() -> 10) + .thenApply(n -> n * 2) + .thenApply(n -> n + 5) + + assertEquals(25, await(result)) + } + + @Test + @DisplayName("should handle sequential async operations") + void testSequentialAsync() { + Promise<String> step1 = async(() -> "Step1") + String result1 = await(step1) + + Promise<String> step2 = async(() -> result1 + "-Step2") + String result2 = await(step2) + + Promise<String> step3 = async(() -> result2 + "-Step3") + String result3 = await(step3) + + assertEquals("Step1-Step2-Step3", result3) + } + + @Test + @DisplayName("should handle parallel async operations like Promise.all()") + @Timeout(2) + void testParallelAsync() { + Promise<Integer> p1 = async(() -> { + sleep(100) + return 1 + }) + + Promise<Integer> p2 = async(() -> { + sleep(100) + return 2 + }) + + Promise<Integer> p3 = async(() -> { + sleep(100) + return 3 + }) + + Promise<Void> all = SimplePromise.allOf(p1, p2, p3) + await(all) + + assertEquals(1, await(p1)) + assertEquals(2, await(p2)) + assertEquals(3, await(p3)) + } + + @Test + @DisplayName("should handle race conditions like Promise.race()") + @Timeout(1) + void testAsyncRace() { + Promise<String> slow = async(() -> { + sleep(500) + return "slow" + }) + + Promise<String> fast = async(() -> { + sleep(50) + return "fast" + }) + + Promise<Object> winner = SimplePromise.anyOf(fast, slow) + String result = await(winner) + + assertEquals("fast", result) + } + + @Test + @DisplayName("should handle async/await with exception handling") + void testAsyncWithExceptionHandling() { + Promise<Integer> promise = async(() -> { + throw new RuntimeException("Simulated error") + }) + + Promise<Integer> recovered = promise.exceptionally(ex -> { + assertTrue(ex.getCause() instanceof RuntimeException) + return -1 + }) + + assertEquals(-1, await(recovered)) + } + + @Test + @DisplayName("should handle async/await with data transformation pipeline") + void testAsyncDataPipeline() { + Promise<? extends List<Integer>> result = async(() -> List.of(1, 2, 3, 4, 5)) + .thenApply(list -> { + List<Integer> doubled = new ArrayList<>() + for (Integer n : list) { + doubled.add(n * 2) + } + return doubled + }) + .thenApply(list -> { + List<Integer> filtered = new ArrayList<>() + for (Integer n : list) { + if (n > 5) { + filtered.add(n) + } + } + return filtered + }) + + List<Integer> expected = List.of(6, 8, 10) + assertEquals(expected, await(result)) + } + + @Test + @DisplayName("should handle nested async operations") + void testNestedAsync() { + Promise<Integer> outer = async(() -> { + Promise<Integer> inner = async(() -> 5) + return await(inner) * 2 + }) + + assertEquals(10, await(outer)) + } + } + + @Nested + @DisplayName("Advanced async patterns") + class AdvancedPatternsTest { + + @Test + @DisplayName("should handle async retry pattern") + void testAsyncRetryPattern() { + AtomicInteger attempts = new AtomicInteger(0) + + Promise<String> result = async(() -> { + int count = attempts.incrementAndGet() + if (count < 3) { + throw new RuntimeException("Not ready yet") + } + return "Success after " + count + " attempts" + }).exceptionallyCompose(ex -> { + sleep(50) + return async(() -> { + int count = attempts.incrementAndGet() + return "Success after " + count + " attempts" + }) + }) + + String finalResult = await(result) + assertTrue(finalResult.contains("Success")) + assertTrue(attempts.get() >= 2) + } + + @Test + @DisplayName("should handle async timeout pattern") + @Timeout(1) + void testAsyncTimeoutPattern() throws Exception { + Promise<String> slowTask = async(() -> { + sleep(5000) + return "completed" + }) + + Promise<String> timeoutFuture = slowTask + .orTimeout(200, TimeUnit.MILLISECONDS) + .exceptionally(ex -> "timeout") + + assertEquals("timeout", timeoutFuture.get()) + } + + @Test + @DisplayName("should handle async map-reduce pattern") + void testAsyncMapReducePattern() { + List<Integer> numbers = List.of(1, 2, 3, 4, 5) + + List<Promise<Integer>> squarePromises = new ArrayList<>() + for (Integer n : numbers) { + def tmpN = n + squarePromises.add(async(() -> tmpN * tmpN)) + } + + int total = 0 + for (Promise<Integer> p : squarePromises) { + total += await(p) + } + + assertEquals(55, total) + } + + @Test + @DisplayName("should handle async combine operations") + void testAsyncCombine() { + Promise<Integer> p1 = async(() -> 10) + Promise<Integer> p2 = async(() -> 20) + + Promise<Integer> combined = p1.thenCombine(p2, Integer::sum) + + assertEquals(30, await(combined)) + } + + @Test + @DisplayName("should handle async compose operations") + void testAsyncCompose() { + Promise<Integer> initial = async(() -> 5) + + Promise<Integer> composed = initial.thenCompose(n -> + async(() -> n * 3) + ) + + assertEquals(15, await(composed)) + } + } + + @Nested + @DisplayName("Real-world scenarios") + class RealWorldScenariosTest { + + @Test + @DisplayName("should simulate API call with data transformation") + void testSimulateApiCall() { + Promise<String> apiCall = async(() -> { + sleep(50) + return "{\"userId\":1,\"name\":\"John\"}" + }) + + Promise<String> transformed = apiCall.thenApply(json -> + json.replace("John", "Jane") + ) + + String result = await(transformed) + assertTrue(result.contains("Jane")) + assertFalse(result.contains("John")) + } + + @Test + @DisplayName("should handle multiple parallel API calls") + @Timeout(1) + void testMultipleParallelApiCalls() { + Promise<String> userApi = async(() -> { + sleep(100) + return "User data" + }) + + Promise<String> orderApi = async(() -> { + sleep(100) + return "Order data" + }) + + Promise<String> productApi = async(() -> { + sleep(100) + return "Product data" + }) + + Promise<String> combined = userApi.thenCombine(orderApi, (u, o) -> u + ", " + o) + .thenCombine(productApi, (uo, p) -> uo + ", " + p) + + String result = await(combined) + assertEquals("User data, Order data, Product data", result) + } + + @Test + @DisplayName("should handle async cache pattern") + void testAsyncCachePattern() { + AtomicReference<String> cache = new AtomicReference<>() + AtomicInteger fetchCount = new AtomicInteger(0) + + Promise<String> getCachedData = async(() -> { + if (cache.get() != null) { + return cache.get() + } + fetchCount.incrementAndGet() + sleep(50) + String data = "Fresh data" + cache.set(data) + return data + }) + + String firstCall = await(getCachedData) + assertEquals("Fresh data", firstCall) + assertEquals(1, fetchCount.get()) + + Promise<String> secondCall = async(() -> cache.get()) + assertEquals("Fresh data", await(secondCall)) + assertEquals(1, fetchCount.get()) + } + + @Test + @DisplayName("should handle async queue processing") + @Timeout(2) + void testAsyncQueueProcessing() { + List<Integer> queue = List.of(10, 20, 30, 40, 50) + AtomicInteger processed = new AtomicInteger(0) + + List<Promise<Void>> tasks = new ArrayList<>() + for (Integer item : queue) { + def tmp = item + tasks.add(async(() -> { + sleep(50) + processed.addAndGet(tmp) + return + })) + } + + Promise<Void> allProcessed = SimplePromise.allOf(tasks as Promise[]) + await(allProcessed) + + assertEquals(150, processed.get()) + } + + @Test + @DisplayName("should handle async batch processing") + void testAsyncBatchProcessing() { + List<Integer> batch = List.of(1, 2, 3, 4, 5) + + Promise<Integer> batchSum = async(() -> { + int sum = 0 + for (Integer item : batch) { + sum += await(async(() -> item * 2)) + } + return sum + }) + + assertEquals(30, await(batchSum)) + } + } + + @Nested + @DisplayName("Error handling scenarios") + class ErrorHandlingTest { + + @Test + @DisplayName("should propagate exceptions in async chain") + void testExceptionPropagation() { + Promise<Integer> promise = async(() -> 10) + .thenApply(n -> { + throw new RuntimeException("Chain error") + }) + + assertThrows(AwaitException.class, () -> await(promise)) + } + + @Test + @DisplayName("should handle exception in async operation") + void testAsyncException() { + Promise<Integer> promise = async(() -> { + throw new IllegalStateException("Async error") + }) + + assertThrows(AwaitException.class, () -> await(promise)) + } + + @Test + @DisplayName("should recover from exception with fallback") + void testExceptionRecovery() { + Promise<Object> promise = async(() -> { + throw new RuntimeException("Error") + }).handle((result, ex) -> { + if (ex != null) { + return "Fallback value" + } + return result + }) + + assertEquals("Fallback value", await(promise)) + } + + @Test + @DisplayName("should handle exception with exceptionally") + void testExceptionallyHandler() { + Promise<Integer> promise = async(() -> { + throw new RuntimeException("Error") + }).exceptionally(ex -> { + assertTrue(ex.getCause() instanceof RuntimeException) + return 999 + }) + + assertEquals(999, await(promise)) + } + + @Test + @DisplayName("should handle null pointer exception") + void testNullPointerException() { + Promise<String> promise = async(() -> { + String s = null + return s.length() + "" + }) + + assertThrows(AwaitException.class, () -> await(promise)) + } + } + + @Nested + @DisplayName("Thread safety and concurrency") + class ConcurrencyTest { + + @Test + @DisplayName("should handle concurrent async operations") + @Timeout(2) + void testConcurrentOperations() throws InterruptedException { + AtomicInteger counter = new AtomicInteger(0) + CountDownLatch latch = new CountDownLatch(100) + + for (int i = 0; i < 100; i++) { + async(() -> { + counter.incrementAndGet() + latch.countDown() + return null + }) + } + + latch.await(1, TimeUnit.SECONDS) + assertEquals(100, counter.get()) + } + + @Test + @DisplayName("should handle shared state correctly") + void testSharedState() { + AtomicInteger shared = new AtomicInteger(0) + + List<Promise<Void>> promises = new ArrayList<>() + for (int i = 0; i < 10; i++) { + promises.add(async(() -> { + shared.incrementAndGet() + return + })) + } + + for (Promise<Void> p : promises) { + await(p) + } + + assertEquals(10, shared.get()) + } + } + + @Nested + @DisplayName("Edge cases") + class EdgeCasesTest { + + @Test + @DisplayName("should handle empty result") + void testEmptyResult() { + Promise<Void> promise = async(() -> null) + assertNull(await(promise)) + } + + @Test + @DisplayName("should handle boolean results") + void testBooleanResults() { + Promise<Boolean> truePromise = async(() -> true) + Promise<Boolean> falsePromise = async(() -> false) + + assertTrue(await(truePromise)) + assertFalse(await(falsePromise)) + } + + @Test + @DisplayName("should handle long running tasks") + @Timeout(2) + void testLongRunningTask() { + Promise<String> promise = async(() -> { + sleep(500) + return "Long task completed" + }) + + assertEquals("Long task completed", await(promise)) + } + + @Test + @DisplayName("should handle immediate completion") + void testImmediateCompletion() { + long start = System.currentTimeMillis() + Promise<String> promise = async(() -> "Immediate") + String result = await(promise) + long duration = System.currentTimeMillis() - start + + assertEquals("Immediate", result) + assertTrue(duration < 100) + } + } + +} diff --git a/src/test/groovy/groovy/util/concurrent/async/SimplePromiseTest.groovy b/src/test/groovy/groovy/util/concurrent/async/SimplePromiseTest.groovy new file mode 100644 index 0000000000..e732cca300 --- /dev/null +++ b/src/test/groovy/groovy/util/concurrent/async/SimplePromiseTest.groovy @@ -0,0 +1,906 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async + +import groovy.transform.CompileStatic +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.function.Executable + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow +import static org.junit.jupiter.api.Assertions.assertEquals +import static org.junit.jupiter.api.Assertions.assertFalse +import static org.junit.jupiter.api.Assertions.assertNotNull +import static org.junit.jupiter.api.Assertions.assertNotSame +import static org.junit.jupiter.api.Assertions.assertNull +import static org.junit.jupiter.api.Assertions.assertSame +import static org.junit.jupiter.api.Assertions.assertThrows +import static org.junit.jupiter.api.Assertions.assertTrue + +@CompileStatic +class SimplePromiseTest { + + @Nested + @DisplayName("Factory Methods") + class FactoryMethodsTest { + @Test + @DisplayName("should create promise from CompletableFuture") + void testOfWithCompletableFuture() { + Promise<String> promise = SimplePromise.of(() -> "test") + + assertNotNull(promise) + assertEquals("test", promise.await()) + } + + @Test + @DisplayName("should create empty promise") + void testOfEmpty() { + Promise<String> promise = SimplePromise.of() + + assertNotNull(promise) + assertFalse(promise.isDone()) + } + } + + @Nested + @DisplayName("CompletedPromise") + class SimplePromiseCompletedPromiseTest { + @Test + void testCompletedPromiseWithValue() throws Exception { + Promise<Integer> p = SimplePromise.completedPromise(42) + + assertTrue(p.isDone(), "promise should be done") + assertFalse(p.isCancelled(), "should not be cancelled") + assertFalse(p.isCompletedExceptionally(), "should not be completed exceptionally") + + // blocking and non-blocking retrievals + assertEquals(42, p.await()) + assertEquals(42, p.await()) + assertEquals(42, p.get()) + assertEquals(42, p.getNow(0)) + assertEquals(42, p.toCompletableFuture().get()) + assertEquals(42, p.get(1, TimeUnit.SECONDS)) + + // attempting to cancel or complete an already-completed promise returns false + assertFalse(p.cancel(true)) + assertFalse(p.complete(99)) + } + + @Test + void testCompletedPromiseWithNull() throws Exception { + Promise<Object> p = SimplePromise.completedPromise(null) + + assertTrue(p.isDone()) + assertFalse(p.isCompletedExceptionally()) + assertNull(p.await()) + assertNull(p.await()) + assertNull(p.getNow("absent")) + assertNull(p.toCompletableFuture().get()) + } + + @Test + void testThenApplyOnCompletedPromise() { + Promise<Integer> p = SimplePromise.completedPromise(5) + + Promise<Integer> mapped = p.thenApply(x -> x * 11) + + // mapping should produce the expected result immediately + assertEquals(55, mapped.await()) + assertFalse(mapped.isCompletedExceptionally()) + } + + @Test + void testCompleteReturnsFalseAndObtrudeValueOverrides() { + Promise<Integer> p = SimplePromise.completedPromise(1) + + // cannot complete again + assertFalse(p.complete(2)) + assertEquals(1, p.await()) + + // obtrudeValue forcibly changes the stored value + p.obtrudeValue(2) + assertEquals(2, p.await()) + } + + @Test + void testObtrudeExceptionMakesPromiseExceptional() { + Promise<Integer> p = SimplePromise.completedPromise(3) + + p.obtrudeException(new IllegalStateException("boom")) + + assertTrue(p.isCompletedExceptionally()) + + // join throws CompletionException + assertThrows(CompletionException.class, p::join) + + // await wraps thrown exception in AwaitException + assertThrows(AwaitException.class, p::await) + } + } + + @Nested + @DisplayName("allOf and anyOf Methods") + class SimplePromiseAllAnyTest { + + private ExecutorService executor = Executors.newSingleThreadExecutor() + + @AfterEach + void tearDown() { + executor.shutdownNow() + } + + @Test + void testAllOfCompletesWhenAllComplete() throws Exception { + Promise<Integer> p1 = SimplePromise.of(() -> 1) + Promise<Integer> p2 = SimplePromise.of(() -> 2) + Promise<Integer> p3 = SimplePromise.of() + + // complete p3 asynchronously after a short delay + executor.submit({ + Thread.sleep(50) + p3.complete(3) + } as Runnable) + + Promise<Void> all = SimplePromise.allOf(p1, p2, p3) + + // should complete when the last promise completes + assertDoesNotThrow((Executable) () -> { all.await() }) + assertTrue(all.isDone()) + // allOf semantics mirror CompletableFuture.allOf (no aggregated result), expect null from join + assertNull(all.await()) + } + + @Test + void testAllOfPropagatesExceptionIfAnyFail() { + Promise<Integer> good = SimplePromise.of(() -> 7) + Promise<Integer> bad = SimplePromise.of() + bad.completeExceptionally(new RuntimeException("fail")) + + Promise<Void> all = SimplePromise.allOf(good, bad) + + assertTrue(all.isDone()) + assertThrows(AwaitException.class, { all.await() }) + } + + @Test + void testAnyOfCompletesWithFirstValue() throws Exception { + Promise<String> slow = SimplePromise.of() + Promise<String> fast = SimplePromise.of() + + // fast completes earlier + executor.submit({ + Thread.sleep(30) + fast.complete("fast") + } as Runnable) + + executor.submit({ + Thread.sleep(80) + slow.complete("slow") + } as Runnable) + + Promise<Object> any = SimplePromise.anyOf(slow, fast) + + assertEquals("fast", any.await()) + assertTrue(any.isDone()) + } + + @Test + void testAnyOfCompletesImmediatelyIfOneAlreadyCompleted() { + Promise<Integer> completed = SimplePromise.completedPromise(10) + Promise<Integer> pending = SimplePromise.of() + + Promise<Object> any = SimplePromise.anyOf(completed, pending) + + assertEquals(10, any.await()) + assertTrue(any.isDone()) + } + + @Test + void testAnyOfPropagatesFirstExceptionIfItOccursFirst() throws Exception { + Promise<String> exc = SimplePromise.of() + Promise<String> value = SimplePromise.of() + + // exception happens first + executor.submit({ + Thread.sleep(20) + exc.completeExceptionally(new IllegalStateException("boom")) + } as Runnable) + + // value completes slightly later + executor.submit({ + Thread.sleep(60) + value.complete("ok") + } as Runnable) + + Promise<Object> any = SimplePromise.anyOf(exc, value) + + // first completion is exceptional -> any should be exceptional + assertThrows(AwaitException.class, { any.await() }) + } + } + + @Nested + @DisplayName("Completion Methods") + class CompletionMethodsTest { + @Test + @DisplayName("should complete with value") + void testComplete() { + Promise<String> promise = SimplePromise.of() + boolean result = promise.complete("value") + + assertTrue(result) + assertTrue(promise.isDone()) + assertEquals("value", promise.await()) + } + + @Test + @DisplayName("should complete exceptionally") + void testCompleteExceptionally() { + Promise<String> promise = SimplePromise.of() + Exception ex = new RuntimeException("error") + boolean result = promise.completeExceptionally(ex) + + assertTrue(result) + assertTrue(promise.isCompletedExceptionally()) + assertThrows(CompletionException.class, promise::join) + } + + @Test + @DisplayName("should complete async with supplier") + void testCompleteAsync() throws Exception { + Promise<String> promise = SimplePromise.of() + promise.completeAsync(() -> "async-value") + + Thread.sleep(100) + assertEquals("async-value", promise.get()) + } + + @Test + @DisplayName("should complete async with supplier and executor") + void testCompleteAsyncWithExecutor() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<String> promise = SimplePromise.of() + promise.completeAsync(() -> "executor-value", executor) + + assertEquals("executor-value", promise.get(1, TimeUnit.SECONDS)) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should obtrude value") + void testObtrudeValue() { + Promise<String> promise = SimplePromise.of() + promise.complete("first") + promise.obtrudeValue("second") + + assertEquals("second", promise.await()) + } + + @Test + @DisplayName("should obtrude exception") + void testObtrudeException() { + Promise<String> promise = SimplePromise.of() + promise.complete("value") + RuntimeException ex = new RuntimeException("obtruded") + promise.obtrudeException(ex) + + CompletionException thrown = assertThrows(CompletionException.class, promise::join) + assertEquals(ex, thrown.getCause()) + } + } + + @Nested + @DisplayName("Transformation Methods") + class TransformationMethodsTest { + @Test + @DisplayName("should apply function with thenApply") + void testThenApply() { + Promise<Integer> promise = SimplePromise.completedPromise(5) + Promise<String> result = promise.thenApply(n -> "Number: " + n) + + assertEquals("Number: 5", result.await()) + } + + @Test + @DisplayName("should apply function async with thenApplyAsync") + void testThenApplyAsync() { + Promise<Integer> promise = SimplePromise.completedPromise(10) + Promise<Integer> result = promise.thenApplyAsync(n -> n * 2) + + assertEquals(20, result.await()) + } + + @Test + @DisplayName("should compose with thenCompose") + void testThenCompose() { + Promise<Integer> promise = SimplePromise.completedPromise(3) + Promise<Integer> result = promise.thenCompose(n -> + SimplePromise.completedPromise(n * 3)) + + assertEquals(9, result.await()) + } + + @Test + @DisplayName("should handle with bifunction") + void testHandle() { + Promise<Integer> promise = SimplePromise.completedPromise(5) + Promise<String> result = promise.handle((val, ex) -> + ex == null ? "Success: " + val : "Error") + + assertEquals("Success: 5", result.await()) + } + + @Test + @DisplayName("should handle exception with handle") + void testHandleWithException() { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<String> result = promise.handle((val, ex) -> + ex != null ? "Handled: " + ex.getMessage() : "OK") + + assertEquals("Handled: error", result.await()) + } + } + + @Nested + @DisplayName("Consumer Methods") + class ConsumerMethodsTest { + @Test + @DisplayName("should accept value with thenAccept") + void testThenAccept() { + AtomicReference<String> ref = new AtomicReference<>() + Promise<String> promise = SimplePromise.completedPromise("test") + Promise<Void> result = promise.thenAccept(ref::set) + + result.await() + assertEquals("test", ref.get()) + } + + @Test + @DisplayName("should run action with thenRun") + void testThenRun() { + AtomicBoolean executed = new AtomicBoolean(false) + Promise<String> promise = SimplePromise.completedPromise("test") + Promise<Void> result = promise.thenRun(() -> executed.set(true)) + + result.await() + assertTrue(executed.get()) + } + + @Test + @DisplayName("should execute whenComplete") + void testWhenComplete() { + AtomicReference<String> ref = new AtomicReference<>() + Promise<String> promise = SimplePromise.completedPromise("value") + Promise<String> result = promise.whenComplete((val, ex) -> ref.set(val)) + + assertEquals("value", result.await()) + assertEquals("value", ref.get()) + } + } + + @Nested + @DisplayName("Combination Methods") + class CombinationMethodsTest { + @Test + @DisplayName("should combine two promises with thenCombine") + void testThenCombine() { + Promise<Integer> p1 = SimplePromise.completedPromise(5) + Promise<Integer> p2 = SimplePromise.completedPromise(3) + Promise<Integer> result = p1.thenCombine(p2, Integer::sum) + + assertEquals(8, result.await()) + } + + @Test + @DisplayName("should accept both with thenAcceptBoth") + void testThenAcceptBoth() { + AtomicInteger sum = new AtomicInteger(0) + Promise<Integer> p1 = SimplePromise.completedPromise(5) + Promise<Integer> p2 = SimplePromise.completedPromise(7) + Promise<Void> result = p1.thenAcceptBoth(p2, (a, b) -> sum.set(a + b)) + + result.await() + assertEquals(12, sum.get()) + } + + @Test + @DisplayName("should run after both complete") + void testRunAfterBoth() { + AtomicBoolean executed = new AtomicBoolean(false) + Promise<String> p1 = SimplePromise.completedPromise("a") + Promise<String> p2 = SimplePromise.completedPromise("b") + Promise<Void> result = p1.runAfterBoth(p2, () -> executed.set(true)) + + result.await() + assertTrue(executed.get()) + } + + @Test + @DisplayName("should apply to either") + void testApplyToEither() { + Promise<Integer> p1 = SimplePromise.completedPromise(1) + Promise<Integer> p2 = SimplePromise.of() + Promise<Integer> result = p1.applyToEither(p2, n -> n * 10) + + assertEquals(10, result.await()) + } + + @Test + @DisplayName("should accept either") + void testAcceptEither() { + AtomicInteger ref = new AtomicInteger(0) + Promise<Integer> p1 = SimplePromise.completedPromise(42) + Promise<Integer> p2 = SimplePromise.of() + Promise<Void> result = p1.acceptEither(p2, ref::set) + + result.await() + assertEquals(42, ref.get()) + } + + @Test + @DisplayName("should run after either completes") + void testRunAfterEither() { + AtomicBoolean executed = new AtomicBoolean(false) + Promise<String> p1 = SimplePromise.completedPromise("fast") + Promise<String> p2 = SimplePromise.of() + Promise<Void> result = p1.runAfterEither(p2, () -> executed.set(true)) + + result.await() + assertTrue(executed.get()) + } + } + + @Nested + @DisplayName("Exception Handling") + class ExceptionHandlingTest { + @Test + @DisplayName("should handle exception with exceptionally") + void testExceptionally() { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<Integer> result = promise.exceptionally(ex -> -1) + + assertEquals(-1, result.await()) + } + + @Test + @DisplayName("should compose exception with exceptionallyCompose") + void testExceptionallyCompose() { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<Integer> result = promise.exceptionallyCompose(ex -> + SimplePromise.completedPromise(99)) + + assertEquals(99, result.await()) + } + + @Test + @DisplayName("should handle exception async with exceptionallyAsync") + void testExceptionallyAsync() { + Promise<String> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("async-error")) + Promise<String> result = promise.exceptionallyAsync(ex -> "recovered") + + assertEquals("recovered", result.await()) + } + } + + @Nested + @DisplayName("Timeout and Cancellation") + class TimeoutAndCancellationTest { + @Test + @DisplayName("should timeout with orTimeout") + void testOrTimeout() { + Promise<String> promise = SimplePromise.of() + Promise<String> result = promise.orTimeout(100, TimeUnit.MILLISECONDS) + + assertThrows(CompletionException.class, result::join) + } + + @Test + @DisplayName("should complete on timeout with completeOnTimeout") + void testCompleteOnTimeout() { + Promise<String> promise = SimplePromise.of() + Promise<String> result = promise.completeOnTimeout("default", 100, TimeUnit.MILLISECONDS) + + assertEquals("default", result.await()) + } + + @Test + @DisplayName("should cancel promise") + void testCancel() { + Promise<String> promise = SimplePromise.of() + boolean cancelled = promise.cancel(true) + + assertTrue(cancelled) + assertTrue(promise.isCancelled()) + assertTrue(promise.isDone()) + } + } + + @Nested + @DisplayName("Retrieval Methods") + class RetrievalMethodsTest { + @Test + @DisplayName("should get value with blocking get") + void testGet() throws Exception { + Promise<String> promise = SimplePromise.completedPromise("value") + assertEquals("value", promise.get()) + } + + @Test + @DisplayName("should get value with timeout") + void testGetWithTimeout() throws Exception { + Promise<String> promise = SimplePromise.completedPromise("value") + assertEquals("value", promise.get(1, TimeUnit.SECONDS)) + } + + @Test + @DisplayName("should join and return value") + void testJoin() { + Promise<String> promise = SimplePromise.completedPromise("joined") + assertEquals("joined", promise.join()) + } + + @Test + @DisplayName("should get now or default value") + void testGetNow() { + Promise<String> promise = SimplePromise.of() + assertEquals("default", promise.getNow("default")) + + promise.complete("actual") + assertEquals("actual", promise.getNow("default")) + } + + @Test + @DisplayName("should await and return value") + void testAwait() { + Promise<String> promise = SimplePromise.of(() -> "awaited") + assertEquals("awaited", promise.await()) + } + + @Test + @DisplayName("should throw AwaitException on await failure") + void testAwaitException() { + Promise<String> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + + assertThrows(AwaitException.class, promise::await) + } + } + + @Nested + @DisplayName("Status and Conversion") + class StatusAndConversionTest { + @Test + @DisplayName("should check if done") + void testIsDone() { + Promise<String> promise = SimplePromise.of() + assertFalse(promise.isDone()) + + promise.complete("value") + assertTrue(promise.isDone()) + } + + @Test + @DisplayName("should check if cancelled") + void testIsCancelled() { + Promise<String> promise = SimplePromise.of() + assertFalse(promise.isCancelled()) + + promise.cancel(false) + assertTrue(promise.isCancelled()) + } + + @Test + @DisplayName("should check if completed exceptionally") + void testIsCompletedExceptionally() { + Promise<String> promise = SimplePromise.of() + assertFalse(promise.isCompletedExceptionally()) + + promise.completeExceptionally(new RuntimeException()) + assertTrue(promise.isCompletedExceptionally()) + } + + @Test + @DisplayName("should convert to CompletableFuture") + void testToCompletableFuture() { + CompletableFuture<String> cf = CompletableFuture.completedFuture("test") + Promise<String> promise = SimplePromise.of(cf) + + assertSame(cf, promise.toCompletableFuture()) + } + + @Test + @DisplayName("should convert to Promise") + void testToPromise() { + Promise<String> promise = SimplePromise.of() + assertSame(promise, promise.toPromise()) + } + + @Test + @DisplayName("should copy promise") + void testCopy() { + Promise<String> original = SimplePromise.completedPromise("original") + Promise<String> copy = original.copy() + + assertNotSame(original, copy) + assertEquals("original", copy.await()) + } + + @Test + @DisplayName("should get number of dependents") + void testGetNumberOfDependents() { + Promise<String> promise = SimplePromise.of() + assertEquals(0, promise.getNumberOfDependents()) + + promise.thenApply(String::toUpperCase) + assertTrue(promise.getNumberOfDependents() > 0) + } + + @Test + @DisplayName("should get default executor") + void testDefaultExecutor() { + Promise<String> promise = SimplePromise.of() + assertNotNull(promise.defaultExecutor()) + } + } + + @Nested + @DisplayName("Additional Coverage Tests") + class AdditionalCoverageTest { + + @Test + @DisplayName("should apply function async with executor") + void testThenApplyAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.completedPromise(5) + Promise<String> result = promise.thenApplyAsync(n -> "Value: " + n, executor) + + assertEquals("Value: 5", result.await()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should accept value async with executor") + void testThenAcceptAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicReference<String> ref = new AtomicReference<>() + try { + Promise<String> promise = SimplePromise.completedPromise("test") + Promise<Void> result = promise.thenAcceptAsync(ref::set, executor) + + result.await() + assertEquals("test", ref.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should run action async with executor") + void testThenRunAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicBoolean executed = new AtomicBoolean(false) + try { + Promise<String> promise = SimplePromise.completedPromise("test") + Promise<Void> result = promise.thenRunAsync(() -> executed.set(true), executor) + + result.await() + assertTrue(executed.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should compose async with executor") + void testThenComposeAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.completedPromise(3) + Promise<Integer> result = promise.thenComposeAsync( + n -> SimplePromise.completedPromise((n * 4)), executor) + + assertEquals(12, result.await()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should handle async with executor") + void testHandleAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.completedPromise(7) + Promise<String> result = promise.handleAsync( + (val, ex) -> ex == null ? "Result: " + val : "Error", executor) + + assertEquals("Result: 7", result.await()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should execute whenCompleteAsync with executor") + void testWhenCompleteAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicReference<String> ref = new AtomicReference<>() + try { + Promise<String> promise = SimplePromise.completedPromise("async") + Promise<String> result = promise.whenCompleteAsync((val, ex) -> ref.set(val), executor) + + assertEquals("async", result.await()) + assertEquals("async", ref.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should combine async with executor") + void testThenCombineAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> p1 = SimplePromise.completedPromise(10) + Promise<Integer> p2 = SimplePromise.completedPromise(20) + Promise<Integer> result = p1.thenCombineAsync(p2, Integer::sum, executor) + + assertEquals(30, result.await()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should accept both async with executor") + void testThenAcceptBothAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicInteger sum = new AtomicInteger(0) + try { + Promise<Integer> p1 = SimplePromise.completedPromise(15) + Promise<Integer> p2 = SimplePromise.completedPromise(25) + Promise<Void> result = p1.thenAcceptBothAsync(p2, (a, b) -> sum.set(a + b), executor) + + result.await() + assertEquals(40, sum.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should run after both async with executor") + void testRunAfterBothAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicBoolean executed = new AtomicBoolean(false) + try { + Promise<String> p1 = SimplePromise.completedPromise("x") + Promise<String> p2 = SimplePromise.completedPromise("y") + Promise<Void> result = p1.runAfterBothAsync(p2, () -> executed.set(true), executor) + + result.await() + assertTrue(executed.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should apply to either async with executor") + void testApplyToEitherAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> p1 = SimplePromise.completedPromise(100) + Promise<Integer> p2 = SimplePromise.of() + Promise<Integer> result = p1.applyToEitherAsync(p2, n -> n * 2, executor) + + assertEquals(200, result.await()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should accept either async with executor") + void testAcceptEitherAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicInteger ref = new AtomicInteger(0) + try { + Promise<Integer> p1 = SimplePromise.completedPromise(88) + Promise<Integer> p2 = SimplePromise.of() + Promise<Void> result = p1.acceptEitherAsync(p2, ref::set, executor) + + result.await() + assertEquals(88, ref.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should run after either async with executor") + void testRunAfterEitherAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicBoolean executed = new AtomicBoolean(false) + try { + Promise<String> p1 = SimplePromise.completedPromise("first") + Promise<String> p2 = SimplePromise.of() + Promise<Void> result = p1.runAfterEitherAsync(p2, () -> executed.set(true), executor) + + result.await() + assertTrue(executed.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should handle exception async with exceptionallyComposeAsync with executor") + void testExceptionallyComposeAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<Integer> result = promise.exceptionallyComposeAsync( + ex -> SimplePromise.completedPromise(777), executor) + + assertEquals(777, result.await()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should handle exception with exceptionallyAsync with executor") + void testExceptionallyAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<String> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<String> result = promise.exceptionallyAsync(ex -> "handled", executor) + + assertEquals("handled", result.await()) + } finally { + executor.shutdown() + } + } + } + +} diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncASTTransformationTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncASTTransformationTest.groovy new file mode 100644 index 0000000000..d1b13a1889 --- /dev/null +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncASTTransformationTest.groovy @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.codehaus.groovy.transform + + +import org.junit.jupiter.api.Test + +import static groovy.test.GroovyAssert.assertScript + +/** + * Unit tests for {@link AsyncASTTransformation}. + */ +class AsyncASTTransformationTest { + + @Test + void testAsyncAnnotation() { + assertScript ''' + class A { + @groovy.transform.Async + String fetchName() { + return 'Daniel' + } + } + def result = new A().fetchName() + assert result instanceof groovy.util.concurrent.async.Promise + assert result.await() == 'Daniel' + ''' + } +}
