This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 788ea747ceb161df8ec287e214d54f898b0f433e
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Mar 3 16:37:47 2023 +0000

    [CEP-21] Add missing implementations to concurrent utils
    
    Adds a handful of implementations to subclasses in the
    org.apache.cassandra.utils.concurrent package
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../cassandra/utils/concurrent/AbstractFuture.java | 33 ++++++++++++++++++++++
 .../cassandra/utils/concurrent/AsyncFuture.java    | 11 ++++++++
 .../apache/cassandra/utils/concurrent/Future.java  | 10 +++++++
 .../cassandra/utils/concurrent/LoadingMap.java     |  7 +++++
 .../cassandra/utils/concurrent/SyncFuture.java     | 11 ++++++++
 5 files changed, 72 insertions(+)

diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java 
b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index 83cd7d3f8b..a6c5aeda8a 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -383,6 +383,39 @@ public abstract class AbstractFuture<V> implements 
Future<V>
         return result;
     }
 
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    @Override
+    public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> 
andThen)
+    {
+        return andThenAsync(andThen, null);
+    }
+
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    protected <T> Future<T> andThenAsync(AbstractFuture<T> result, Function<? 
super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+    {
+        addListener(() -> {
+            try
+            {
+                if (isSuccess()) 
andThen.apply(getNow()).addListener(propagate(result));
+                else result.tryFailure(cause());
+            }
+            catch (Throwable t)
+            {
+                result.tryFailure(t);
+                throw t;
+            }
+        }, executor);
+        return result;
+    }
+
     /**
      * Add a listener to be invoked once this future completes.
      * Listeners are submitted to {@link #notifyExecutor} in the order they 
are added (or the specified executor
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java 
b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index 0ef35d5dde..89cea2dbe3 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -144,6 +144,17 @@ public class AsyncFuture<V> extends AbstractFuture<V>
         return flatMap(new AsyncFuture<>(), flatMapper, executor);
     }
 
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    @Override
+    public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> 
andThen, @Nullable Executor executor)
+    {
+        return andThenAsync(new AsyncFuture<>(), andThen, executor);
+    }
+
     /**
      * Wait for this future to complete {@link Awaitable#await()}
      */
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java 
b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index fae5d43ef1..909d140d17 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -177,6 +177,16 @@ public interface Future<V> extends 
io.netty.util.concurrent.Future<V>, Listenabl
      */
     <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, 
Executor executor);
 
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     */
+    <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> 
andThen);
+
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     */
+    <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> 
andThen, Executor executor);
+
     /**
      * Invoke {@code runnable} on completion, using {@code executor}.
      *
diff --git a/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java 
b/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java
index 399eb0e3b5..1d41a97db8 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/LoadingMap.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiFunction;
@@ -199,4 +201,9 @@ public class LoadingMap<K, V>
             return (T) value;
         }
     }
+
+    public Map<K, Future<V>> copyInternal()
+    {
+        return new HashMap<K, Future<V>>(internalMap);
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java 
b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 2a3598aa03..287911bd12 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -111,6 +111,17 @@ public class SyncFuture<V> extends AbstractFuture<V>
         return flatMap(new SyncFuture<>(), flatMapper, executor);
     }
 
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    @Override
+    public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> 
andThen, @Nullable Executor executor)
+    {
+        return andThenAsync(new SyncFuture<>(), andThen, executor);
+    }
+
     /**
      * Shared implementation of various promise completion methods.
      * Updates the result if it is possible to do so, returning 
success/failure.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to