This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 788ea747ceb161df8ec287e214d54f898b0f433e Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Mar 3 16:37:47 2023 +0000 [CEP-21] Add missing implementations to concurrent utils Adds a handful of implementations to subclasses in the org.apache.cassandra.utils.concurrent package Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../cassandra/utils/concurrent/AbstractFuture.java | 33 ++++++++++++++++++++++ .../cassandra/utils/concurrent/AsyncFuture.java | 11 ++++++++ .../apache/cassandra/utils/concurrent/Future.java | 10 +++++++ .../cassandra/utils/concurrent/LoadingMap.java | 7 +++++ .../cassandra/utils/concurrent/SyncFuture.java | 11 ++++++++ 5 files changed, 72 insertions(+) diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java index 83cd7d3f8b..a6c5aeda8a 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java @@ -383,6 +383,39 @@ public abstract class AbstractFuture<V> implements Future<V> return result; } + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + @Override + public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen) + { + return andThenAsync(andThen, null); + } + + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + protected <T> Future<T> andThenAsync(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor) + { + addListener(() -> { + try + { + if (isSuccess()) andThen.apply(getNow()).addListener(propagate(result)); + else result.tryFailure(cause()); + } + catch (Throwable t) + { + result.tryFailure(t); + throw t; + } + }, executor); + return result; + } + /** * Add a listener to be invoked once this future completes. * Listeners are submitted to {@link #notifyExecutor} in the order they are added (or the specified executor diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java index 0ef35d5dde..89cea2dbe3 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java @@ -144,6 +144,17 @@ public class AsyncFuture<V> extends AbstractFuture<V> return flatMap(new AsyncFuture<>(), flatMapper, executor); } + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + @Override + public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor) + { + return andThenAsync(new AsyncFuture<>(), andThen, executor); + } + /** * Wait for this future to complete {@link Awaitable#await()} */ diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java index fae5d43ef1..909d140d17 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Future.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java @@ -177,6 +177,16 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl */ <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, Executor executor); + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + */ + <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen); + + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + */ + <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, Executor executor); + /** * Invoke {@code runnable} on completion, using {@code executor}. * diff --git a/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java b/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java index 399eb0e3b5..1d41a97db8 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java +++ b/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java @@ -18,6 +18,8 @@ package org.apache.cassandra.utils.concurrent; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; @@ -199,4 +201,9 @@ public class LoadingMap<K, V> return (T) value; } } + + public Map<K, Future<V>> copyInternal() + { + return new HashMap<K, Future<V>>(internalMap); + } } diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java index 2a3598aa03..287911bd12 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java @@ -111,6 +111,17 @@ public class SyncFuture<V> extends AbstractFuture<V> return flatMap(new SyncFuture<>(), flatMapper, executor); } + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + @Override + public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor) + { + return andThenAsync(new SyncFuture<>(), andThen, executor); + } + /** * Shared implementation of various promise completion methods. * Updates the result if it is possible to do so, returning success/failure. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
