This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit 8986becb8f0888a22b1209989ed9813d2b80d263 Author: Daniel Sun <[email protected]> AuthorDate: Sat Oct 25 21:06:10 2025 +0900 GROOVY-9381: Support async/await like ES7(backend) --- .../util/concurrent/async/AwaitException.java | 44 ++ .../groovy/util/concurrent/async/Awaitable.java | 35 + .../java/groovy/util/concurrent/async/Promise.java | 200 ++++++ .../util/concurrent/async/SimplePromise.java | 517 +++++++++++++++ .../util/concurrent/async/StageAwaitable.java | 721 ++++++++++++++++++++ .../util/concurrent/async/SimplePromiseTest.groovy | 726 +++++++++++++++++++++ 6 files changed, 2243 insertions(+) diff --git a/src/main/java/groovy/util/concurrent/async/AwaitException.java b/src/main/java/groovy/util/concurrent/async/AwaitException.java new file mode 100644 index 0000000000..5095519f05 --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/AwaitException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +/** + * Exception thrown when attempting to wait for the result of a promise + * that aborted by throwing an exception. This exception can be + * inspected using the {@link #getCause()} method. + * + * @see Promise + * @since 6.0 + */ +public class AwaitException extends RuntimeException { + public AwaitException() { + } + + public AwaitException(String message) { + super(message); + } + + public AwaitException(Throwable cause) { + super(cause); + } + + public AwaitException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/groovy/util/concurrent/async/Awaitable.java b/src/main/java/groovy/util/concurrent/async/Awaitable.java new file mode 100644 index 0000000000..7bf4d477df --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/Awaitable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +/** + * Represents a result of an asynchronous computation + * + * @since 6.0 + */ +public interface Awaitable<T> { + /** + * Waits if necessary for the computation to complete, and then retrieves its + * result. + * + * @return the computed result + * @throws AwaitException if the computation was cancelled or completed + */ + T await(); +} diff --git a/src/main/java/groovy/util/concurrent/async/Promise.java b/src/main/java/groovy/util/concurrent/async/Promise.java new file mode 100644 index 0000000000..f3cf27511e --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/Promise.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +/** + * Represents the result of an asynchronous computation that may be explicitly completed (setting its + * value and status), and may be used as a {@link StageAwaitable}, + * supporting dependent functions and actions that trigger upon its + * completion. + * + * @since 6.0 + */ +public interface Promise<T> extends StageAwaitable<T>, Future<T> { + /** + * Causes invocations of {@link #get()} and related methods to throw the provided + * exception if not already completed. + * + * @param ex the exception + * @return {@code true} if this invocation caused this Promise to transition to a + * completed state, else {@code false} + */ + boolean completeExceptionally(Throwable ex); + + /** + * Forcibly causes subsequent invocations of {@link #get()} and related methods + * to throw the provided exception, regardless of whether already completed. + * This method is intended for use only in error recovery scenarios and may cause + * ongoing dependent completions to use established outcomes instead of the + * overwritten outcome even in such situations. + * + * @param ex the exception + * @throws NullPointerException if the exception is null + */ + void obtrudeException(Throwable ex); + + /** + * Completes this Promise exceptionally with a {@link TimeoutException} if not + * otherwise completed before the specified timeout. + * + * @param timeout the duration to wait before completing exceptionally with a + * TimeoutException, measured in units of {@code unit} + * @param unit the {@code TimeUnit} that determines how to interpret the + * {@code timeout} parameter + * @return this Promise + */ + Promise<T> orTimeout(long timeout, TimeUnit unit); + + /** + * Returns the result value when complete, or throws an unchecked exception if + * completed exceptionally. To better conform with the use of common functional + * forms, if a computation involved in the completion of this Promise threw an + * exception, this method throws an unchecked {@link CompletionException} with + * the underlying exception as its cause. + * + * @return the result value + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed exceptionally or a + * completion computation threw an exception + */ + T join(); + + /** + * Returns the default Executor used for async methods that do not specify an + * Executor. + * + * @return the executor + */ + Executor defaultExecutor(); + + /** + * Completes this Promise with the provided value if not otherwise completed + * before the specified timeout. + * + * @param value the value to use upon timeout + * @param timeout the duration to wait before completing normally with the + * provided value, measured in units of {@code unit} + * @param unit the {@code TimeUnit} that determines how to interpret the + * {@code timeout} parameter + * @return this Promise + */ + Promise<T> completeOnTimeout(T value, long timeout, TimeUnit unit); + + /** + * If not already completed, sets the value returned by {@link #get()} and + * related methods to the provided value. + * + * @param value the result value + * @return {@code true} if this invocation caused this Promise to transition + * to a completed state, else {@code false} + */ + boolean complete(T value); + + /** + * Returns the estimated number of Promises whose completions are awaiting + * completion of this Promise. This method is designed for use in monitoring + * system state, not for synchronization control. + * + * @return the number of dependent Promises + */ + int getNumberOfDependents(); + + /** + * Returns {@code true} if this Promise completed exceptionally, in any way. + * Possible causes include cancellation, explicit invocation of + * {@code completeExceptionally}, and abrupt termination of a CompletionStage + * action. + * + * @return {@code true} if this Promise completed exceptionally + */ + boolean isCompletedExceptionally(); + + /** + * Completes this Promise with the result of the provided Supplier function + * invoked from an asynchronous task using the specified executor. + * + * @param supplier a function returning the value to be used to complete this + * Promise + * @param executor the executor to use for asynchronous execution + * @return this Promise + */ + Promise<T> completeAsync(Supplier<? extends T> supplier, Executor executor); + + /** + * Forcibly sets or resets the value subsequently returned by method + * {@link #get()} and related methods, regardless of whether already + * completed. This method is designed for use only in error recovery actions, + * and even in such situations may result in ongoing dependent completions + * using established versus overwritten outcomes. + * + * @param value the completion value + */ + void obtrudeValue(T value); + + /** + * Returns a new Promise that is completed normally with the same value as + * this Promise when it completes normally. If this Promise completes + * exceptionally, then the returned Promise completes exceptionally with a + * CompletionException with this exception as cause. The behavior is + * equivalent to {@code thenApply(x -> x)}. This method may be useful as a + * form of "defensive copying", to prevent clients from completing, while + * still being able to arrange dependent actions. + * + * @return the new Promise + */ + Promise<T> copy(); + + /** + * Completes this Promise with the result of the provided Supplier function + * invoked from an asynchronous task using the default executor. + * + * @param supplier a function returning the value to be used to complete this + * Promise + * @return this Promise + */ + Promise<T> completeAsync(Supplier<? extends T> supplier); + + /** + * Returns the result value (or throws any encountered exception) if + * completed, else returns the provided valueIfAbsent. + * + * @param valueIfAbsent the value to return if not completed + * @return the result value, if completed, else the provided valueIfAbsent + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed exceptionally or a + * completion computation threw an exception + */ + T getNow(T valueIfAbsent); + + /** + * Returns a {@link CompletableFuture} representation of the object. + * + * @return the CompletableFuture + */ + CompletableFuture<T> toCompletableFuture(); +} diff --git a/src/main/java/groovy/util/concurrent/async/SimplePromise.java b/src/main/java/groovy/util/concurrent/async/SimplePromise.java new file mode 100644 index 0000000000..77fe42f10e --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/SimplePromise.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * A simple implementation of {@link Promise} based on {@link CompletableFuture}. + * + * @since 6.0 + */ +public class SimplePromise<T> implements Promise<T> { + private final CompletableFuture<T> future; + + private SimplePromise(CompletableFuture<T> future) { + this.future = future; + } + + /** + * Creates a new Promise backed by the given CompletableFuture. + * + * @param future the CompletableFuture to back the Promise + * @param <T> the type of the Promise's result + * @return the new Promise + */ + public static <T> SimplePromise<T> of(CompletableFuture<T> future) { + return new SimplePromise<>(future); + } + + /** + * Returns a new Promise that is not yet completed. + * + * @param <T> the type of the Promise's result + * @return the new Promise + */ + public static <T> SimplePromise<T> of() { + return of(new CompletableFuture<>()); + } + + /** + * Returns a new Promise that is asynchronously completed by a task running in + * the {@link ForkJoinPool#commonPool()} with the value obtained by calling the + * provided Supplier. + * + * @param supplier a function returning the value to be used to complete the + * returned Promise + * @param <U> the function's return type + * @return the new Promise + */ + public static <U> SimplePromise<U> of(Supplier<U> supplier) { + return of(CompletableFuture.supplyAsync(supplier)); + } + + /** + * Returns a new Promise that is asynchronously completed by a task running in + * the provided executor with the value obtained by calling the provided Supplier. + * + * @param supplier a function returning the value to be used to complete the + * returned Promise + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new Promise + */ + public static <U> SimplePromise<U> of(Supplier<U> supplier, Executor executor) { + return of(CompletableFuture.supplyAsync(supplier, executor)); + } + + /** + * Returns a new Promise that is asynchronously completed by a task running in + * the {@link ForkJoinPool#commonPool()} after it runs the provided action. + * + * @param runnable the action to run before completing the returned Promise + * @return the new Promise + */ + public static SimplePromise<Void> of(Runnable runnable) { + return of(CompletableFuture.runAsync(runnable)); + } + + /** + * Returns a new Promise that is asynchronously completed by a task running in + * the provided executor after it runs the provided action. + * + * @param runnable the action to run before completing the returned Promise + * @param executor the executor to use for asynchronous execution + * @return the new Promise + */ + public static SimplePromise<Void> of(Runnable runnable, Executor executor) { + return of(CompletableFuture.runAsync(runnable, executor)); + } + + /** + * Returns a new Promise that is already completed with the provided value. + * + * @param value the value + * @param <U> the type of the value + * @return the completed Promise + */ + public static <U> SimplePromise<U> completedPromise(U value) { + return of(CompletableFuture.completedFuture(value)); + } + + /** + * Returns a new Promise that is completed when all of the provided Promises + * complete. If any of the provided Promises complete exceptionally, then the + * returned Promise also does so, with a CompletionException holding this + * exception as its cause. Otherwise, the results, if any, of the provided + * Promises are not reflected in the returned Promise, but may be obtained by + * inspecting them individually. If no Promises are provided, returns a Promise + * completed with the value {@code null}. + * + * <p>Among the applications of this method is to await completion of a set of + * independent Promises before continuing a program, as in: + * {@code SimplePromise.allOf(p1, p2, p3).join();}. + * + * @param sps the Promises + * @return a new Promise that is completed when all of the provided Promises + * complete + * @throws NullPointerException if the array or any of its elements are {@code null} + */ + public static SimplePromise<Void> allOf(SimplePromise<?>... sps) { + return of(CompletableFuture.allOf( + Arrays.stream(sps) + .map(SimplePromise::toCompletableFuture) + .toArray(CompletableFuture[]::new) + )); + } + + /** + * Returns a new Promise that is completed when any of the provided Promises + * complete, with the same result. Otherwise, if it completed exceptionally, + * the returned Promise also does so, with a CompletionException holding this + * exception as its cause. If no Promises are provided, returns an incomplete + * Promise. + * + * @param sps the Promises + * @return a new Promise that is completed with the result or exception of any + * of the provided Promises when one completes + * @throws NullPointerException if the array or any of its elements are + * {@code null} + */ + public static SimplePromise<Object> anyOf(SimplePromise<?>... sps) { + return of(CompletableFuture.anyOf( + Arrays.stream(sps) + .map(SimplePromise::toCompletableFuture) + .toArray(CompletableFuture[]::new) + )); + } + + @Override + public T await() { + try { + return this.join(); + } catch (Throwable t) { + throw new AwaitException(t); + } + } + + @Override + public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) { + return of(future.whenCompleteAsync(action, executor)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return future.completeExceptionally(ex); + } + + @Override + public Promise<Void> thenRun(Runnable action) { + return of(future.thenRun(action)); + } + + @Override + public <U> Promise<U> applyToEither(StageAwaitable<? extends T> other, Function<? super T, U> fn) { + return of(future.applyToEither(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public void obtrudeException(Throwable ex) { + future.obtrudeException(ex); + } + + @Override + public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return of(future.exceptionallyComposeAsync(t -> { + final StageAwaitable<T> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public Promise<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { + return of(future.whenComplete(action)); + } + + @Override + public <U> Promise<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn, Executor executor) { + return of(future.applyToEitherAsync(other.toPromise().toCompletableFuture(), fn, executor)); + } + + @Override + public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn) { + return of(future.thenApplyAsync(fn)); + } + + @Override + public <U> Promise<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action) { + return of(future.thenAcceptBothAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) { + return of(future.exceptionallyAsync(fn, executor)); + } + + @Override + public Promise<Void> thenRunAsync(Runnable action, Executor executor) { + return of(future.thenRunAsync(action, executor)); + } + + @Override + public Promise<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterEitherAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<T> orTimeout(long timeout, TimeUnit unit) { + return of(future.orTimeout(timeout, unit)); + } + + @Override + public <U, V> Promise<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) { + return of(future.thenCombineAsync(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public Promise<Void> runAfterBoth(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterBoth(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public <U> Promise<U> thenCompose(Function<? super T, ? extends StageAwaitable<U>> fn) { + return of(future.thenCompose(t -> { + final StageAwaitable<U> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public Promise<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action, Executor executor) { + return of(future.runAfterBothAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) { + return of(future.handleAsync(fn)); + } + + @Override + public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn, Executor executor) { + return of(future.thenComposeAsync(t -> { + final StageAwaitable<U> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + }, executor)); + } + + @Override + public Promise<Void> thenAccept(Consumer<? super T> action) { + return of(future.thenAccept(action)); + } + + @Override + public T join() { + return future.join(); + } + + @Override + public Promise<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action) { + return of(future.acceptEitherAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Executor defaultExecutor() { + return future.defaultExecutor(); + } + + @Override + public Promise<T> exceptionallyCompose(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return of(future.exceptionallyCompose(t -> { + final StageAwaitable<T> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public <U> Promise<Void> thenAcceptBoth(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action) { + return of(future.thenAcceptBoth(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<Void> runAfterEither(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterEither(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public Promise<T> completeOnTimeout(T value, long timeout, TimeUnit unit) { + return of(future.completeOnTimeout(value, timeout, unit)); + } + + @Override + public <U> Promise<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) { + return of(future.handle(fn)); + } + + @Override + public boolean complete(T value) { + return future.complete(value); + } + + @Override + public Promise<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { + return of(future.thenAcceptAsync(action, executor)); + } + + @Override + public int getNumberOfDependents() { + return future.getNumberOfDependents(); + } + + @Override + public <U> Promise<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) { + return of(future.thenAcceptBothAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) { + return of(future.exceptionallyAsync(fn)); + } + + @Override + public Promise<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action, Executor executor) { + return of(future.runAfterEitherAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public boolean isCompletedExceptionally() { + return future.isCompletedExceptionally(); + } + + @Override + public Promise<T> completeAsync(Supplier<? extends T> supplier) { + return of(future.completeAsync(supplier)); + } + + @Override + public <U> Promise<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn) { + return of(future.applyToEitherAsync(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { + return of(future.whenCompleteAsync(action)); + } + + @Override + public Promise<Void> thenRunAsync(Runnable action) { + return of(future.thenRunAsync(action)); + } + + @Override + public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn) { + return of(future.thenApply(fn)); + } + + @Override + public void obtrudeValue(T value) { + future.obtrudeValue(value); + } + + @Override + public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn) { + return of(future.thenComposeAsync(t -> { + final StageAwaitable<U> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + })); + } + + @Override + public Promise<T> copy() { + return of(future.copy()); + } + + @Override + public Promise<Void> acceptEither(StageAwaitable<? extends T> other, Consumer<? super T> action) { + return of(future.acceptEither(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) { + return of(future.thenApplyAsync(fn, executor)); + } + + @Override + public <U, V> Promise<V> thenCombine(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) { + return of(future.thenCombine(other.toPromise().toCompletableFuture(), fn)); + } + + @Override + public Promise<T> exceptionally(Function<Throwable, ? extends T> fn) { + return of(future.exceptionally(fn)); + } + + @Override + public Promise<T> completeAsync(Supplier<? extends T> supplier, Executor executor) { + return of(future.completeAsync(supplier, executor)); + } + + @Override + public Promise<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action, Executor executor) { + return of(future.acceptEitherAsync(other.toPromise().toCompletableFuture(), action, executor)); + } + + @Override + public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn, Executor executor) { + return of(future.exceptionallyComposeAsync(t -> { + final StageAwaitable<T> p = fn.apply(t); + return p.toPromise().toCompletableFuture(); + }, executor)); + } + + @Override + public Promise<T> toPromise() { + return this; + } + + @Override + public Promise<Void> thenAcceptAsync(Consumer<? super T> action) { + return of(future.thenAcceptAsync(action)); + } + + @Override + public <U, V> Promise<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) { + return of(future.thenCombineAsync(other.toPromise().toCompletableFuture(), fn, executor)); + } + + @Override + public Promise<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action) { + return of(future.runAfterBothAsync(other.toPromise().toCompletableFuture(), action)); + } + + @Override + public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { + return of(future.handleAsync(fn, executor)); + } + + @Override + public T getNow(T valueIfAbsent) { + return future.getNow(valueIfAbsent); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + + @Override + public CompletableFuture<T> toCompletableFuture() { + return future; + } +} diff --git a/src/main/java/groovy/util/concurrent/async/StageAwaitable.java b/src/main/java/groovy/util/concurrent/async/StageAwaitable.java new file mode 100644 index 0000000000..c8bd33cb71 --- /dev/null +++ b/src/main/java/groovy/util/concurrent/async/StageAwaitable.java @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async; + +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Represents a computation stage in a potentially asynchronous execution chain that + * executes an action or computes a value upon completion of another StageAwaitable. + * A stage completes when its computation finishes, which may subsequently trigger + * the execution of dependent stages in the chain. + * + * @since 6.0 + */ +public interface StageAwaitable<T> extends Awaitable<T> { + /** + * Creates a new StageAwaitable that executes the provided function with this stage's + * result as input when this stage completes successfully. + * + * <p>This method follows the same pattern as {@link java.util.Optional#map Optional.map} + * and {@link java.util.stream.Stream#map Stream.map}. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute the resulting StageAwaitable's value + * @param <U> the return type of the function + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenApply(Function<? super T, ? extends U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using this stage's default asynchronous execution facility when this stage + * completes successfully. The function receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute the resulting StageAwaitable's value + * @param <U> the return type of the function + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenApplyAsync(Function<? super T, ? extends U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using the specified Executor when this stage completes successfully. The function + * receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute the resulting StageAwaitable's value + * @param executor the executor used for asynchronous execution + * @param <U> the return type of the function + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action with this stage's + * result as input when this stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenAccept(Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when this stage + * completes successfully. The action receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenAcceptAsync(Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified Executor when this stage completes successfully. The action + * receives this stage's result as input. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action when this stage + * completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenRun(Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when this stage + * completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenRunAsync(Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified Executor when this stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> thenRunAsync(Runnable action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided function with the results + * of both this stage and the other stage when they both complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @param <V> the function's return type + * @return the newly created StageAwaitable + */ + <U, V> StageAwaitable<V> thenCombine(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using this stage's default asynchronous execution facility when both this stage + * and the other stage complete successfully. The function receives both results as + * arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @param <V> the function's return type + * @return the newly created StageAwaitable + */ + <U, V> StageAwaitable<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using the specified executor when both this stage and the other stage complete + * successfully. The function receives both results as arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the type of the other StageAwaitable's result + * @param <V> the function's return type + * @return the newly created StageAwaitable + */ + <U, V> StageAwaitable<V> thenCombineAsync(StageAwaitable<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action with the results + * of both this stage and the other stage when they both complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<Void> thenAcceptBoth(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when both this stage + * and the other stage complete successfully. The action receives both results as + * arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param <U> the type of the other StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor when both this stage and the other stage complete + * successfully. The action receives both results as arguments. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the type of the other StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<Void> thenAcceptBothAsync(StageAwaitable<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action when both this + * stage and the other stage complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterBoth(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when both this stage + * and the other stage complete successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor when both this stage and the other stage complete + * successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterBothAsync(StageAwaitable<?> other, Runnable action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided function with the result + * from whichever stage completes successfully first (either this stage or the other + * stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> applyToEither(StageAwaitable<? extends T> other, Function<? super T, U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using this stage's default asynchronous execution facility with the result from + * whichever stage completes successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn); + + /** + * Creates a new StageAwaitable that executes the provided function asynchronously + * using the specified executor with the result from whichever stage completes + * successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> applyToEitherAsync(StageAwaitable<? extends T> other, Function<? super T, U> fn, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action with the result + * from whichever stage completes successfully first (either this stage or the other + * stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> acceptEither(StageAwaitable<? extends T> other, Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility with the result from + * whichever stage completes successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor with the result from whichever stage completes + * successfully first (either this stage or the other stage). + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> acceptEitherAsync(StageAwaitable<? extends T> other, Consumer<? super T> action, Executor executor); + + /** + * Creates a new StageAwaitable that executes the provided action when either + * this stage or the other stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterEither(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using this stage's default asynchronous execution facility when either this + * stage or the other stage completes successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action); + + /** + * Creates a new StageAwaitable that executes the provided action asynchronously + * using the specified executor when either this stage or the other stage completes + * successfully. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param other the other StageAwaitable + * @param action the action to perform before completing the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<Void> runAfterEitherAsync(StageAwaitable<?> other, Runnable action, Executor executor); + + /** + * Creates a new StageAwaitable that is completed with the same value as the + * StageAwaitable returned by the provided function. + * + * <p>When this stage completes successfully, the provided function is invoked + * with this stage's result as the argument, returning another StageAwaitable. + * When that stage completes successfully, the StageAwaitable returned by this + * method is completed with the same value. + * + * <p>To ensure progress, the supplied function must arrange eventual completion + * of its result. + * + * <p>This method is analogous to {@link java.util.Optional#flatMap Optional.flatMap} + * and {@link java.util.stream.Stream#flatMap Stream.flatMap}. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute another StageAwaitable + * @param <U> the type of the resulting StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenCompose(Function<? super T, ? extends StageAwaitable<U>> fn); + + /** + * Creates a new StageAwaitable that is completed with the same value as the + * StageAwaitable returned by the provided function, executed asynchronously + * using this stage's default asynchronous execution facility. + * + * <p>When this stage completes successfully, the provided function is invoked + * with this stage's result as the argument, returning another StageAwaitable. + * When that stage completes successfully, the StageAwaitable returned by this + * method is completed with the same value. + * + * <p>To ensure progress, the supplied function must arrange eventual completion + * of its result. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute another StageAwaitable + * @param <U> the type of the resulting StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn); + + /** + * Creates a new StageAwaitable that is completed with the same value as the + * StageAwaitable returned by the provided function, executed asynchronously + * using the specified Executor. + * + * <p>When this stage completes successfully, the provided function is invoked + * with this stage's result as the argument, returning another StageAwaitable. + * When that stage completes successfully, the StageAwaitable returned by this + * method is completed with the same value. + * + * <p>To ensure progress, the supplied function must arrange eventual completion + * of its result. + * + * <p>Refer to the {@link StageAwaitable} documentation for behavior regarding + * exceptional completion scenarios. + * + * @param fn the function used to compute another StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the type of the resulting StageAwaitable's result + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> thenComposeAsync(Function<? super T, ? extends StageAwaitable<U>> fn, Executor executor); + + /** + * Creates a new StageAwaitable that is executed with this stage's result and + * exception as arguments to the provided function when this stage completes + * either successfully or exceptionally. + * + * <p>When this stage is complete, the provided function is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} if none) + * of this stage as arguments, and the function's result is used to complete the + * resulting stage. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Creates a new StageAwaitable that is executed asynchronously using this stage's + * default asynchronous execution facility with this stage's result and exception + * as arguments to the provided function when this stage completes either + * successfully or exceptionally. + * + * <p>When this stage is complete, the provided function is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} if none) + * of this stage as arguments, and the function's result is used to complete the + * resulting stage. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Creates a new StageAwaitable that is executed asynchronously using the + * specified executor with this stage's result and exception as arguments to + * the provided function when this stage completes either successfully or + * exceptionally. + * + * <p>When this stage is complete, the provided function is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} if none) + * of this stage as arguments, and the function's result is used to complete the + * resulting stage. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * @param executor the executor used for asynchronous execution + * @param <U> the function's return type + * @return the newly created StageAwaitable + */ + <U> StageAwaitable<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor); + + /** + * Creates a new StageAwaitable with the same result or exception as this stage, + * that executes the provided action when this stage completes. + * + * <p>When this stage is complete, the provided action is invoked with the result + * (or {@code null} if none) and the exception (or {@code null} if none) of this + * stage as arguments. The resulting stage is completed when the action returns. + * + * <p>Unlike method {@link #handle handle}, this method is not designed to + * translate completion outcomes, so the supplied action should not throw an + * exception. However, if it does, the following rules apply: if this stage + * completed successfully but the supplied action throws an exception, then the + * resulting stage completes exceptionally with the supplied action's exception. + * Or, if this stage completed exceptionally and the supplied action throws an + * exception, then the resulting stage completes exceptionally with this stage's + * exception. + * + * @param action the action to perform + * @return the newly created StageAwaitable + */ + StageAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); + + /** + * Creates a new StageAwaitable with the same result or exception as this stage, + * that executes the provided action asynchronously using this stage's default + * asynchronous execution facility when this stage completes. + * + * <p>When this stage is complete, the provided action is invoked with the result + * (or {@code null} if none) and the exception (or {@code null} if none) of this + * stage as arguments. The resulting stage is completed when the action returns. + * + * <p>Unlike method {@link #handleAsync(BiFunction) handleAsync}, this method is + * not designed to translate completion outcomes, so the supplied action should + * not throw an exception. However, if it does, the following rules apply: If + * this stage completed successfully but the supplied action throws an exception, + * then the resulting stage completes exceptionally with the supplied action's + * exception. Or, if this stage completed exceptionally and the supplied action + * throws an exception, then the resulting stage completes exceptionally with + * this stage's exception. + * + * @param action the action to perform + * @return the newly created StageAwaitable + */ + StageAwaitable<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); + + /** + * Creates a new StageAwaitable with the same result or exception as this stage, + * that executes the provided action asynchronously using the specified Executor + * when this stage completes. + * + * <p>When this stage is complete, the provided action is invoked with the result + * (or {@code null} if none) and the exception (or {@code null} if none) of this + * stage as arguments. The resulting stage is completed when the action returns. + * + * <p>Unlike method {@link #handleAsync(BiFunction,Executor) handleAsync}, this + * method is not designed to translate completion outcomes, so the supplied action + * should not throw an exception. However, if it does, the following rules apply: + * If this stage completed successfully but the supplied action throws an + * exception, then the resulting stage completes exceptionally with the supplied + * action's exception. Or, if this stage completed exceptionally and the supplied + * action throws an exception, then the resulting stage completes exceptionally + * with this stage's exception. + * + * @param action the action to perform + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + StageAwaitable<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); + + /** + * Creates a new StageAwaitable that is executed with this stage's exception as + * the argument to the provided function when this stage completes exceptionally. + * Otherwise, if this stage completes successfully, then the resulting stage also + * completes successfully with the same value. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * if this StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + StageAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn); + + /** + * Creates a new StageAwaitable that is executed asynchronously with this stage's + * exception as the argument to the provided function using this stage's default + * asynchronous execution facility when this stage completes exceptionally. + * Otherwise, if this stage completes successfully, then the resulting stage also + * completes successfully with the same value. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * if this StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) { + return handle((r, ex) -> (ex == null) ? this : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1))).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is executed asynchronously with this stage's + * exception as the argument to the provided function using the specified Executor + * when this stage completes exceptionally. Otherwise, if this stage completes + * successfully, then the resulting stage also completes successfully with the + * same value. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the value of the resulting StageAwaitable + * if this StageAwaitable completed exceptionally + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) { + return handle((r, ex) -> (ex == null) ? this : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1), executor)).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is composed using the result of the provided + * function applied to this stage's exception when this stage completes exceptionally. + * + * @implSpec The default implementation invokes {@link #handle}, invoking the + * provided function on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the resulting StageAwaitable if this + * StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyCompose(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return handle((r, ex) -> (ex == null) ? this : fn.apply(ex)).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is composed asynchronously using the result + * of the provided function applied to this stage's exception using this stage's + * default asynchronous execution facility when this stage completes exceptionally. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the resulting StageAwaitable if this + * StageAwaitable completed exceptionally + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn) { + return handle((r, ex) -> (ex == null) ? this : this.handleAsync((r1, ex1) -> fn.apply(ex1)).thenCompose(Function.identity())).thenCompose(Function.identity()); + } + + /** + * Creates a new StageAwaitable that is composed asynchronously using the result + * of the provided function applied to this stage's exception using the specified + * Executor when this stage completes exceptionally. + * + * @implSpec The default implementation invokes {@link #handle}, relaying to + * {@link #handleAsync} on exception, then {@link #thenCompose} for result. + * + * @param fn the function used to compute the resulting StageAwaitable if this + * StageAwaitable completed exceptionally + * @param executor the executor used for asynchronous execution + * @return the newly created StageAwaitable + */ + default StageAwaitable<T> exceptionallyComposeAsync(Function<Throwable, ? extends StageAwaitable<T>> fn, Executor executor) { + return handle((r, ex) -> (ex == null) ? this : this.handleAsync((r1, ex1) -> fn.apply(ex1), executor).thenCompose(Function.identity())).thenCompose(Function.identity()); + } + + /** + * Creates a {@link Promise} that maintains the same completion properties + * as this stage. If this stage is already a Promise, this method may return + * this stage itself. Otherwise, invoking this method may have the same effect as + * {@code thenApply(x -> x)}, but returns an instance of type {@code Promise}. + * + * @return the Promise + */ + Promise<T> toPromise(); +} diff --git a/src/test/groovy/groovy/util/concurrent/async/SimplePromiseTest.groovy b/src/test/groovy/groovy/util/concurrent/async/SimplePromiseTest.groovy new file mode 100644 index 0000000000..02f2e0fcf0 --- /dev/null +++ b/src/test/groovy/groovy/util/concurrent/async/SimplePromiseTest.groovy @@ -0,0 +1,726 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.util.concurrent.async + +import groovy.transform.CompileStatic +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +import static org.junit.jupiter.api.Assertions.assertEquals +import static org.junit.jupiter.api.Assertions.assertFalse +import static org.junit.jupiter.api.Assertions.assertNotNull +import static org.junit.jupiter.api.Assertions.assertNotSame +import static org.junit.jupiter.api.Assertions.assertSame +import static org.junit.jupiter.api.Assertions.assertThrows +import static org.junit.jupiter.api.Assertions.assertTrue + +@CompileStatic +class SimplePromiseTest { + + @Nested + @DisplayName("Factory Methods") + class FactoryMethodsTest { + @Test + @DisplayName("should create promise from CompletableFuture") + void testOfWithCompletableFuture() { + CompletableFuture<String> cf = CompletableFuture.completedFuture("test") + Promise<String> promise = SimplePromise.of(cf) + + assertNotNull(promise) + assertEquals("test", promise.join()) + } + + @Test + @DisplayName("should create empty promise") + void testOfEmpty() { + Promise<String> promise = SimplePromise.of() + + assertNotNull(promise) + assertFalse(promise.isDone()) + } + } + + @Nested + @DisplayName("Completion Methods") + class CompletionMethodsTest { + @Test + @DisplayName("should complete with value") + void testComplete() { + Promise<String> promise = SimplePromise.of() + boolean result = promise.complete("value") + + assertTrue(result) + assertTrue(promise.isDone()) + assertEquals("value", promise.join()) + } + + @Test + @DisplayName("should complete exceptionally") + void testCompleteExceptionally() { + Promise<String> promise = SimplePromise.of() + Exception ex = new RuntimeException("error") + boolean result = promise.completeExceptionally(ex) + + assertTrue(result) + assertTrue(promise.isCompletedExceptionally()) + assertThrows(CompletionException.class, promise::join) + } + + @Test + @DisplayName("should complete async with supplier") + void testCompleteAsync() throws Exception { + Promise<String> promise = SimplePromise.of() + promise.completeAsync(() -> "async-value") + + Thread.sleep(100) + assertEquals("async-value", promise.get()) + } + + @Test + @DisplayName("should complete async with supplier and executor") + void testCompleteAsyncWithExecutor() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<String> promise = SimplePromise.of() + promise.completeAsync(() -> "executor-value", executor) + + assertEquals("executor-value", promise.get(1, TimeUnit.SECONDS)) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should obtrude value") + void testObtrudeValue() { + Promise<String> promise = SimplePromise.of() + promise.complete("first") + promise.obtrudeValue("second") + + assertEquals("second", promise.join()) + } + + @Test + @DisplayName("should obtrude exception") + void testObtrudeException() { + Promise<String> promise = SimplePromise.of() + promise.complete("value") + RuntimeException ex = new RuntimeException("obtruded") + promise.obtrudeException(ex) + + CompletionException thrown = assertThrows(CompletionException.class, promise::join) + assertEquals(ex, thrown.getCause()) + } + } + + @Nested + @DisplayName("Transformation Methods") + class TransformationMethodsTest { + @Test + @DisplayName("should apply function with thenApply") + void testThenApply() { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(5)) + Promise<String> result = promise.thenApply(n -> "Number: " + n) + + assertEquals("Number: 5", result.join()) + } + + @Test + @DisplayName("should apply function async with thenApplyAsync") + void testThenApplyAsync() { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(10)) + Promise<Integer> result = promise.thenApplyAsync(n -> n * 2) + + assertEquals(20, result.join()) + } + + @Test + @DisplayName("should compose with thenCompose") + void testThenCompose() { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(3)) + Promise<Integer> result = promise.thenCompose(n -> + SimplePromise.of(CompletableFuture.completedFuture(n * 3))) + + assertEquals(9, result.join()) + } + + @Test + @DisplayName("should handle with bifunction") + void testHandle() { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(5)) + Promise<String> result = promise.handle((val, ex) -> + ex == null ? "Success: " + val : "Error") + + assertEquals("Success: 5", result.join()) + } + + @Test + @DisplayName("should handle exception with handle") + void testHandleWithException() { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<String> result = promise.handle((val, ex) -> + ex != null ? "Handled: " + ex.getMessage() : "OK") + + assertEquals("Handled: error", result.join()) + } + } + + @Nested + @DisplayName("Consumer Methods") + class ConsumerMethodsTest { + @Test + @DisplayName("should accept value with thenAccept") + void testThenAccept() { + AtomicReference<String> ref = new AtomicReference<>() + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("test")) + Promise<Void> result = promise.thenAccept(ref::set) + + result.join() + assertEquals("test", ref.get()) + } + + @Test + @DisplayName("should run action with thenRun") + void testThenRun() { + AtomicBoolean executed = new AtomicBoolean(false) + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("test")) + Promise<Void> result = promise.thenRun(() -> executed.set(true)) + + result.join() + assertTrue(executed.get()) + } + + @Test + @DisplayName("should execute whenComplete") + void testWhenComplete() { + AtomicReference<String> ref = new AtomicReference<>() + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("value")) + Promise<String> result = promise.whenComplete((val, ex) -> ref.set(val)) + + assertEquals("value", result.join()) + assertEquals("value", ref.get()) + } + } + + @Nested + @DisplayName("Combination Methods") + class CombinationMethodsTest { + @Test + @DisplayName("should combine two promises with thenCombine") + void testThenCombine() { + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(5)) + Promise<Integer> p2 = SimplePromise.of(CompletableFuture.completedFuture(3)) + Promise<Integer> result = p1.thenCombine(p2, Integer::sum) + + assertEquals(8, result.join()) + } + + @Test + @DisplayName("should accept both with thenAcceptBoth") + void testThenAcceptBoth() { + AtomicInteger sum = new AtomicInteger(0) + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(5)) + Promise<Integer> p2 = SimplePromise.of(CompletableFuture.completedFuture(7)) + Promise<Void> result = p1.thenAcceptBoth(p2, (a, b) -> sum.set(a + b)) + + result.join() + assertEquals(12, sum.get()) + } + + @Test + @DisplayName("should run after both complete") + void testRunAfterBoth() { + AtomicBoolean executed = new AtomicBoolean(false) + Promise<String> p1 = SimplePromise.of(CompletableFuture.completedFuture("a")) + Promise<String> p2 = SimplePromise.of(CompletableFuture.completedFuture("b")) + Promise<Void> result = p1.runAfterBoth(p2, () -> executed.set(true)) + + result.join() + assertTrue(executed.get()) + } + + @Test + @DisplayName("should apply to either") + void testApplyToEither() { + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(1)) + Promise<Integer> p2 = SimplePromise.of() + Promise<Integer> result = p1.applyToEither(p2, n -> n * 10) + + assertEquals(10, result.join()) + } + + @Test + @DisplayName("should accept either") + void testAcceptEither() { + AtomicInteger ref = new AtomicInteger(0) + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(42)) + Promise<Integer> p2 = SimplePromise.of() + Promise<Void> result = p1.acceptEither(p2, ref::set) + + result.join() + assertEquals(42, ref.get()) + } + + @Test + @DisplayName("should run after either completes") + void testRunAfterEither() { + AtomicBoolean executed = new AtomicBoolean(false) + Promise<String> p1 = SimplePromise.of(CompletableFuture.completedFuture("fast")) + Promise<String> p2 = SimplePromise.of() + Promise<Void> result = p1.runAfterEither(p2, () -> executed.set(true)) + + result.join() + assertTrue(executed.get()) + } + } + + @Nested + @DisplayName("Exception Handling") + class ExceptionHandlingTest { + @Test + @DisplayName("should handle exception with exceptionally") + void testExceptionally() { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<Integer> result = promise.exceptionally(ex -> -1) + + assertEquals(-1, result.join()) + } + + @Test + @DisplayName("should compose exception with exceptionallyCompose") + void testExceptionallyCompose() { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<Integer> result = promise.exceptionallyCompose(ex -> + SimplePromise.of(CompletableFuture.completedFuture(99))) + + assertEquals(99, result.join()) + } + + @Test + @DisplayName("should handle exception async with exceptionallyAsync") + void testExceptionallyAsync() { + Promise<String> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("async-error")) + Promise<String> result = promise.exceptionallyAsync(ex -> "recovered") + + assertEquals("recovered", result.join()) + } + } + + @Nested + @DisplayName("Timeout and Cancellation") + class TimeoutAndCancellationTest { + @Test + @DisplayName("should timeout with orTimeout") + void testOrTimeout() { + Promise<String> promise = SimplePromise.of() + Promise<String> result = promise.orTimeout(100, TimeUnit.MILLISECONDS) + + assertThrows(CompletionException.class, result::join) + } + + @Test + @DisplayName("should complete on timeout with completeOnTimeout") + void testCompleteOnTimeout() { + Promise<String> promise = SimplePromise.of() + Promise<String> result = promise.completeOnTimeout("default", 100, TimeUnit.MILLISECONDS) + + assertEquals("default", result.join()) + } + + @Test + @DisplayName("should cancel promise") + void testCancel() { + Promise<String> promise = SimplePromise.of() + boolean cancelled = promise.cancel(true) + + assertTrue(cancelled) + assertTrue(promise.isCancelled()) + assertTrue(promise.isDone()) + } + } + + @Nested + @DisplayName("Retrieval Methods") + class RetrievalMethodsTest { + @Test + @DisplayName("should get value with blocking get") + void testGet() throws Exception { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("value")) + assertEquals("value", promise.get()) + } + + @Test + @DisplayName("should get value with timeout") + void testGetWithTimeout() throws Exception { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("value")) + assertEquals("value", promise.get(1, TimeUnit.SECONDS)) + } + + @Test + @DisplayName("should join and return value") + void testJoin() { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("joined")) + assertEquals("joined", promise.join()) + } + + @Test + @DisplayName("should get now or default value") + void testGetNow() { + Promise<String> promise = SimplePromise.of() + assertEquals("default", promise.getNow("default")) + + promise.complete("actual") + assertEquals("actual", promise.getNow("default")) + } + + @Test + @DisplayName("should await and return value") + void testAwait() { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("awaited")) + assertEquals("awaited", promise.await()) + } + + @Test + @DisplayName("should throw AwaitException on await failure") + void testAwaitException() { + Promise<String> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + + assertThrows(AwaitException.class, promise::await) + } + } + + @Nested + @DisplayName("Status and Conversion") + class StatusAndConversionTest { + @Test + @DisplayName("should check if done") + void testIsDone() { + Promise<String> promise = SimplePromise.of() + assertFalse(promise.isDone()) + + promise.complete("value") + assertTrue(promise.isDone()) + } + + @Test + @DisplayName("should check if cancelled") + void testIsCancelled() { + Promise<String> promise = SimplePromise.of() + assertFalse(promise.isCancelled()) + + promise.cancel(false) + assertTrue(promise.isCancelled()) + } + + @Test + @DisplayName("should check if completed exceptionally") + void testIsCompletedExceptionally() { + Promise<String> promise = SimplePromise.of() + assertFalse(promise.isCompletedExceptionally()) + + promise.completeExceptionally(new RuntimeException()) + assertTrue(promise.isCompletedExceptionally()) + } + + @Test + @DisplayName("should convert to CompletableFuture") + void testToCompletableFuture() { + CompletableFuture<String> cf = CompletableFuture.completedFuture("test") + Promise<String> promise = SimplePromise.of(cf) + + assertSame(cf, promise.toCompletableFuture()) + } + + @Test + @DisplayName("should convert to Promise") + void testToPromise() { + Promise<String> promise = SimplePromise.of() + assertSame(promise, promise.toPromise()) + } + + @Test + @DisplayName("should copy promise") + void testCopy() { + Promise<String> original = SimplePromise.of(CompletableFuture.completedFuture("original")) + Promise<String> copy = original.copy() + + assertNotSame(original, copy) + assertEquals("original", copy.join()) + } + + @Test + @DisplayName("should get number of dependents") + void testGetNumberOfDependents() { + Promise<String> promise = SimplePromise.of() + assertEquals(0, promise.getNumberOfDependents()) + + promise.thenApply(String::toUpperCase) + assertTrue(promise.getNumberOfDependents() > 0) + } + + @Test + @DisplayName("should get default executor") + void testDefaultExecutor() { + Promise<String> promise = SimplePromise.of() + assertNotNull(promise.defaultExecutor()) + } + } + + @Nested + @DisplayName("Additional Coverage Tests") + class AdditionalCoverageTest { + + @Test + @DisplayName("should apply function async with executor") + void testThenApplyAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(5)) + Promise<String> result = promise.thenApplyAsync(n -> "Value: " + n, executor) + + assertEquals("Value: 5", result.join()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should accept value async with executor") + void testThenAcceptAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicReference<String> ref = new AtomicReference<>() + try { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("test")) + Promise<Void> result = promise.thenAcceptAsync(ref::set, executor) + + result.join() + assertEquals("test", ref.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should run action async with executor") + void testThenRunAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicBoolean executed = new AtomicBoolean(false) + try { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("test")) + Promise<Void> result = promise.thenRunAsync(() -> executed.set(true), executor) + + result.join() + assertTrue(executed.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should compose async with executor") + void testThenComposeAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(3)) + Promise<Integer> result = promise.thenComposeAsync( + n -> SimplePromise.of(CompletableFuture.completedFuture(n * 4)), executor) + + assertEquals(12, result.join()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should handle async with executor") + void testHandleAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.of(CompletableFuture.completedFuture(7)) + Promise<String> result = promise.handleAsync( + (val, ex) -> ex == null ? "Result: " + val : "Error", executor) + + assertEquals("Result: 7", result.join()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should execute whenCompleteAsync with executor") + void testWhenCompleteAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicReference<String> ref = new AtomicReference<>() + try { + Promise<String> promise = SimplePromise.of(CompletableFuture.completedFuture("async")) + Promise<String> result = promise.whenCompleteAsync((val, ex) -> ref.set(val), executor) + + assertEquals("async", result.join()) + assertEquals("async", ref.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should combine async with executor") + void testThenCombineAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(10)) + Promise<Integer> p2 = SimplePromise.of(CompletableFuture.completedFuture(20)) + Promise<Integer> result = p1.thenCombineAsync(p2, Integer::sum, executor) + + assertEquals(30, result.join()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should accept both async with executor") + void testThenAcceptBothAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicInteger sum = new AtomicInteger(0) + try { + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(15)) + Promise<Integer> p2 = SimplePromise.of(CompletableFuture.completedFuture(25)) + Promise<Void> result = p1.thenAcceptBothAsync(p2, (a, b) -> sum.set(a + b), executor) + + result.join() + assertEquals(40, sum.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should run after both async with executor") + void testRunAfterBothAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicBoolean executed = new AtomicBoolean(false) + try { + Promise<String> p1 = SimplePromise.of(CompletableFuture.completedFuture("x")) + Promise<String> p2 = SimplePromise.of(CompletableFuture.completedFuture("y")) + Promise<Void> result = p1.runAfterBothAsync(p2, () -> executed.set(true), executor) + + result.join() + assertTrue(executed.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should apply to either async with executor") + void testApplyToEitherAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(100)) + Promise<Integer> p2 = SimplePromise.of() + Promise<Integer> result = p1.applyToEitherAsync(p2, n -> n * 2, executor) + + assertEquals(200, result.join()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should accept either async with executor") + void testAcceptEitherAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicInteger ref = new AtomicInteger(0) + try { + Promise<Integer> p1 = SimplePromise.of(CompletableFuture.completedFuture(88)) + Promise<Integer> p2 = SimplePromise.of() + Promise<Void> result = p1.acceptEitherAsync(p2, ref::set, executor) + + result.join() + assertEquals(88, ref.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should run after either async with executor") + void testRunAfterEitherAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + AtomicBoolean executed = new AtomicBoolean(false) + try { + Promise<String> p1 = SimplePromise.of(CompletableFuture.completedFuture("first")) + Promise<String> p2 = SimplePromise.of() + Promise<Void> result = p1.runAfterEitherAsync(p2, () -> executed.set(true), executor) + + result.join() + assertTrue(executed.get()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should handle exception async with exceptionallyComposeAsync with executor") + void testExceptionallyComposeAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<Integer> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<Integer> result = promise.exceptionallyComposeAsync( + ex -> SimplePromise.of(CompletableFuture.completedFuture(777)), executor) + + assertEquals(777, result.join()) + } finally { + executor.shutdown() + } + } + + @Test + @DisplayName("should handle exception with exceptionallyAsync with executor") + void testExceptionallyAsyncWithExecutor() { + ExecutorService executor = Executors.newSingleThreadExecutor() + try { + Promise<String> promise = SimplePromise.of() + promise.completeExceptionally(new RuntimeException("error")) + Promise<String> result = promise.exceptionallyAsync(ex -> "handled", executor) + + assertEquals("handled", result.join()) + } finally { + executor.shutdown() + } + } + } + +}
