This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fa10af834e750887be53c33d3eebb6d7d3606c1b
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Fri Jan 27 09:41:59 2023 -0800

    add AsyncChain implementations and tests
    
    Patch by Blake Eggleston; Reviewed by David Capwell & Benedict Elliott 
Smith for CASSANDRA-18004
---
 .../cassandra/service/accord/AccordCommand.java    |  74 +++---
 .../service/accord/AccordCommandStore.java         |  18 +-
 .../service/accord/AccordCommandsForKey.java       |  14 +-
 .../cassandra/service/accord/AccordService.java    |   7 +-
 .../cassandra/service/accord/AccordState.java      |   6 +-
 .../cassandra/service/accord/AccordStateCache.java | 128 ++++-----
 .../cassandra/service/accord/ReadFuture.java       | 292 ---------------------
 .../service/accord/async/AsyncLoader.java          |  92 +++----
 .../service/accord/async/AsyncOperation.java       |  46 +++-
 .../service/accord/async/AsyncWriter.java          |  63 ++---
 .../cassandra/service/accord/txn/TxnNamedRead.java |   7 +-
 .../cassandra/service/accord/txn/TxnRead.java      |  57 +---
 .../cassandra/service/accord/txn/TxnWrite.java     |  21 +-
 .../service/accord/AccordCommandTest.java          |  36 +--
 .../service/accord/AccordStateCacheTest.java       |  47 ++--
 .../cassandra/service/accord/AccordTestUtils.java  |   8 +-
 .../service/accord/async/AsyncLoaderTest.java      |  21 +-
 .../service/accord/async/AsyncOperationTest.java   |  16 +-
 18 files changed, 332 insertions(+), 621 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 8020b29ac7..88b39922ce 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -58,14 +58,15 @@ import accord.primitives.Writes;
 import accord.utils.DeterministicIdentitySet;
 import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.store.StoredNavigableMap;
 import org.apache.cassandra.service.accord.store.StoredSet;
 import org.apache.cassandra.service.accord.store.StoredValue;
 import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
 
 import static accord.local.Status.Durability.Local;
 import static accord.local.Status.Durability.NotDurable;
@@ -83,7 +84,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
 
     public static class WriteOnly extends AccordCommand implements 
AccordState.WriteOnly<TxnId, AccordCommand>
     {
-        private Future<?> future = null;
+        private AsyncResult<Void> asyncResult = null;
 
         public WriteOnly(TxnId txnId)
         {
@@ -91,16 +92,16 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         }
 
         @Override
-        public void future(Future<?> future)
+        public void asyncResult(AsyncResult<Void> notifier)
         {
-            Preconditions.checkArgument(this.future == null);
-            this.future = future;
+            Preconditions.checkArgument(this.asyncResult == null);
+            this.asyncResult = notifier;
         }
 
         @Override
-        public Future<?> future()
+        public AsyncResult<Void> asyncResult()
         {
-            return future;
+            return asyncResult;
         }
 
         @Override
@@ -618,7 +619,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
     protected void postApply(SafeCommandStore safeStore)
     {
         AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((SafeAccordCommandStore) safeStore).commandStore().commandCache();
-        cache.cleanupWriteFuture(txnId);
+        cache.cleanupWriteResult(txnId);
         super.postApply(safeStore);
     }
 
@@ -640,58 +641,69 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         return true;
     }
 
-    private Future<Void> applyWithCorrectScope(CommandStore unsafeStore)
+    private AsyncResult<Void> applyWithCorrectScope(CommandStore unsafeStore)
     {
         TxnId txnId = txnId();
-        AsyncPromise<Void> promise = new AsyncPromise<>();
+        AsyncResult.Settable<Void> result = AsyncResults.settable();
         unsafeStore.execute(this, safeStore -> {
             AccordCommand command = (AccordCommand) safeStore.command(txnId);
-            command.apply(safeStore, false).addCallback((v, throwable) -> {
-                if (throwable != null)
-                    promise.tryFailure(throwable);
-                else
-                    promise.trySuccess(null);
-            });
+            command.applyChain(safeStore, 
false).begin(result.settingCallback());
+        }).begin((unused, throwable) -> {
+            if (throwable != null)
+                result.tryFailure(throwable);
         });
-        return promise;
+        return result;
     }
 
-    private Future<Void> apply(SafeCommandStore safeStore, boolean 
canReschedule)
+    private AsyncChain<Void> applyChain(SafeCommandStore safeStore, boolean 
canReschedule)
     {
         AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((SafeAccordCommandStore) safeStore).commandStore().commandCache();
-        Future<Void> future = cache.getWriteFuture(txnId);
-        if (future != null)
-            return future;
+        AsyncResult<Void> writeResult = cache.getWriteResult(txnId);
+        if (writeResult != null)
+            return writeResult;
 
         // this can be called via a listener callback, in which case we won't
         // have the appropriate commandsForKey in scope, so start a new 
operation
         // with the correct scope and notify the caller when that completes
         if (!canApplyWithCurrentScope(safeStore))
         {
+            return writeResult;
+        }
+
+        if (canApplyWithCurrentScope(safeStore))
+        {
+            AsyncChain<Void> chain = super.applyChain(safeStore);
+            writeResult = AsyncResults.forChain(chain);
+        }
+        else
+        {
+            // this can be called via a listener callback, in which case we 
won't
+            // have the appropriate commandsForKey in scope, so start a new 
operation
+            // with the correct scope and notify the caller when that completes
             Preconditions.checkArgument(canReschedule);
             return applyWithCorrectScope(safeStore.commandStore());
         }
+        cache.setWriteResult(txnId, writeResult);
 
-        future = super.apply(safeStore);
-        cache.setWriteFuture(txnId, future);
-        return future;
+        return writeResult;
     }
 
     @Override
-    public Future<Void> apply(SafeCommandStore safeStore)
+    protected AsyncChain<Void> applyChain(SafeCommandStore safeStore)
     {
-        return apply(safeStore, true);
+
+        return applyChain(safeStore, true);
     }
 
     @Override
-    public Future<Data> read(SafeCommandStore safeStore)
+    public AsyncChain<Data> read(SafeCommandStore safeStore)
     {
         AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((SafeAccordCommandStore) safeStore).commandStore().commandCache();
-        Future<Data> future = cache.getReadFuture(txnId);
+        AsyncResult<Data> future = cache.getReadResult(txnId);
         if (future != null)
             return future;
-        future = super.read(safeStore);
-        cache.setReadFuture(txnId, future);
+        future = AsyncResults.forChain(super.read(safeStore));
+        cache.setReadResult(txnId, future);
         return future;
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 87c9c752d9..b7c6e9754e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -54,10 +54,10 @@ import accord.primitives.AbstractKeys;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import org.apache.cassandra.service.accord.api.PartitionKey;
+import accord.utils.async.AsyncChain;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.async.AsyncOperation;
 import org.apache.cassandra.utils.Clock;
-import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -271,13 +271,13 @@ public class AccordCommandStore extends CommandStore
         }
 
         @Override
-        public Future<Void> execute(PreLoadContext context, Consumer<? super 
SafeCommandStore> consumer)
+        public AsyncChain<Void> execute(PreLoadContext context, Consumer<? 
super SafeCommandStore> consumer)
         {
             return AccordCommandStore.this.execute(context, consumer);
         }
 
         @Override
-        public <T> Future<T> submit(PreLoadContext context, Function<? super 
SafeCommandStore, T> function)
+        public <T> AsyncChain<T> submit(PreLoadContext context, Function<? 
super SafeCommandStore, T> function)
         {
             return AccordCommandStore.this.submit(context, function);
         }
@@ -445,11 +445,9 @@ public class AccordCommandStore extends CommandStore
     }
 
     @Override
-    public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super 
SafeCommandStore, T> function)
+    public <T> AsyncChain<T> submit(PreLoadContext loadCtx, Function<? super 
SafeCommandStore, T> function)
     {
-        AsyncOperation<T> operation = AsyncOperation.create(this, loadCtx, 
function);
-        executor.execute(operation);
-        return operation;
+        return AsyncOperation.create(this, loadCtx, function);
     }
 
     @Override
@@ -459,11 +457,9 @@ public class AccordCommandStore extends CommandStore
     }
 
     @Override
-    public Future<Void> execute(PreLoadContext preLoadContext, Consumer<? 
super SafeCommandStore> consumer)
+    public AsyncChain<Void> execute(PreLoadContext preLoadContext, Consumer<? 
super SafeCommandStore> consumer)
     {
-        AsyncOperation<Void> operation = AsyncOperation.create(this, 
preLoadContext, consumer);
-        executor.execute(operation);
-        return operation;
+        return AsyncOperation.create(this, preLoadContext, consumer);
     }
 
     public void executeBlocking(Runnable runnable)
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index b93dc27763..6b3ba9856e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -43,13 +43,13 @@ import accord.local.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import org.apache.cassandra.service.accord.api.PartitionKey;
+import accord.utils.async.AsyncResult;
 import org.apache.cassandra.service.accord.store.StoredLong;
 import org.apache.cassandra.service.accord.store.StoredNavigableMap;
 import org.apache.cassandra.service.accord.store.StoredSet;
 import org.apache.cassandra.service.accord.store.StoredValue;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.Future;
 import org.assertj.core.util.VisibleForTesting;
 
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
@@ -75,7 +75,7 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
 
     public static class WriteOnly extends AccordCommandsForKey implements 
AccordState.WriteOnly<PartitionKey, AccordCommandsForKey>
     {
-        private Future<?> future = null;
+        private AsyncResult<Void> result = null;
 
         public WriteOnly(AccordCommandStore commandStore, PartitionKey key)
         {
@@ -83,17 +83,17 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         }
 
         @Override
-        public void future(Future<?> future)
+        public void asyncResult(AsyncResult<Void> result)
         {
-            Preconditions.checkArgument(this.future == null);
-            this.future = future;
+            Preconditions.checkArgument(this.result == null);
+            this.result = result;
 
         }
 
         @Override
-        public Future<?> future()
+        public AsyncResult<Void> asyncResult()
         {
-            return future;
+            return result;
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index c308afa2de..28f2d878c6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -37,6 +37,8 @@ import accord.messages.Request;
 import accord.primitives.Txn;
 import accord.topology.TopologyManager;
 import org.apache.cassandra.concurrent.Shutdownable;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -51,7 +53,6 @@ import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
@@ -185,8 +186,8 @@ public class AccordService implements IAccordService, 
Shutdownable
         try
         {
             metrics.keySize.update(txn.keys().size());
-            Future<Result> future = node.coordinate(txn);
-            Result result = 
future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
+            AsyncResult<Result> asyncResult = node.coordinate(txn);
+            Result result = AsyncResults.getBlocking(asyncResult, 
DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
             return (TxnData) result;
         }
         catch (ExecutionException e)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordState.java 
b/src/java/org/apache/cassandra/service/accord/AccordState.java
index 2f5a9dc68c..0e1a224cd1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordState.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.service.accord;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import accord.utils.async.AsyncResult;
 import org.apache.cassandra.service.accord.store.StoredNavigableMap;
 import org.apache.cassandra.service.accord.store.StoredSet;
-import org.apache.cassandra.utils.concurrent.Future;
 
 public interface AccordState<K>
 {
@@ -69,9 +69,9 @@ public interface AccordState<K>
             return ReadWrite.WRITE_ONLY;
         }
 
-        void future(Future<?> future);
+        void asyncResult(AsyncResult<Void> notifier);
 
-        Future<?> future();
+        AsyncResult<Void> asyncResult();
 
         /**
          * Apply the write only changes to the full instance
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 37e7ba17cf..5993b73d22 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -35,9 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Data;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is 
shared between the two object types.
@@ -85,7 +85,7 @@ public class AccordStateCache
                 AccordState.WriteOnly<K, V> item = items.get(0);
 
                 // we can't remove items out of order, so if we encounter a 
write is still pending, we stop
-                if (item.future() == null || !item.future().isDone())
+                if (item.asyncResult() == null || !item.asyncResult().isDone())
                     break;
 
                 items.remove(0);
@@ -154,11 +154,11 @@ public class AccordStateCache
     private final Map<Object, WriteOnlyGroup<?, ?>> pendingWriteOnly = new 
HashMap<>();
     private final Set<Instance<?, ?>> instances = new HashSet<>();
 
-    private final NamedMap<Object, Future<?>> loadFutures = new 
NamedMap<>("loadFutures");
-    private final NamedMap<Object, Future<?>> saveFutures = new 
NamedMap<>("saveFutures");
+    private final NamedMap<Object, AsyncResult<Void>> loadResults = new 
NamedMap<>("loadResults");
+    private final NamedMap<Object, AsyncResult<Void>> saveResults = new 
NamedMap<>("saveResults");
 
-    private final NamedMap<Object, Future<Data>> readFutures = new 
NamedMap<>("readFutures");
-    private final NamedMap<Object, Future<?>> writeFutures = new 
NamedMap<>("writeFutures");
+    private final NamedMap<Object, AsyncResult<Data>> readResults = new 
NamedMap<>("readResults");
+    private final NamedMap<Object, AsyncResult<Void>> writeResults = new 
NamedMap<>("writeResults");
 
     Node<?, ?> head;
     Node<?, ?> tail;
@@ -227,13 +227,13 @@ public class AccordStateCache
         bytesCached += node.estimatedSizeOnHeapDelta();
     }
 
-    // don't evict if there's an outstanding save future. If an item is 
evicted then reloaded
+    // don't evict if there's an outstanding save result. If an item is 
evicted then reloaded
     // before it's mutation is applied, out of date info will be loaded
     private boolean canEvict(Object key)
     {
-        // getFuture only returns a future if it is running, so don't need to 
check if its still running
-        Future<?> future = getFuture(saveFutures, key);
-        return future == null;
+        // getResult only returns a result if it is running, so don't need to 
check if its still running
+        AsyncResult<?> result = getAsyncResult(saveResults, key);
+        return result == null || result.isDone();
     }
 
     private void maybeEvict()
@@ -248,7 +248,7 @@ public class AccordStateCache
             current = current.prev;
 
             // if there are any dangling write only groups, apply them and
-            // move their futures into write futures so we don't evict
+            // move their results into write results so we don't evict
             applyAndRemoveWriteOnlyGroup(evict.value);
             if (!canEvict(evict.key()))
                 continue;
@@ -260,9 +260,9 @@ public class AccordStateCache
         }
     }
 
-    private static <K, F extends Future<?>> F getFuture(NamedMap<Object, F> 
futuresMap, K key)
+    private static <K, F extends AsyncResult<?>> F 
getAsyncResult(NamedMap<Object, F> resultMap, K key)
     {
-        F r = futuresMap.get(key);
+        F r = resultMap.get(key);
         if (r == null)
             return null;
 
@@ -270,36 +270,36 @@ public class AccordStateCache
             return r;
 
         if (logger.isTraceEnabled())
-            logger.trace("Clearing future for {} from {}: {}", key, 
futuresMap.name, r);
-        futuresMap.remove(key);
+            logger.trace("Clearing result for {} from {}: {}", key, 
resultMap.name, r);
+        resultMap.remove(key);
         return null;
     }
 
-    private static <K, F extends Future<?>> void setFuture(Map<Object, F> 
futuresMap, K key, F future)
+    private static <K, F extends AsyncResult<?>> void 
setAsyncResult(Map<Object, F> resultsMap, K key, F result)
     {
-        Preconditions.checkState(!futuresMap.containsKey(key));
-        futuresMap.put(key, future);
+        Preconditions.checkState(!resultsMap.containsKey(key));
+        resultsMap.put(key, result);
     }
 
-    private static <K> void mergeFuture(Map<Object, Future<?>> futuresMap, K 
key, Future<?> future)
+    private static <K> void mergeAsyncResult(Map<Object, AsyncResult<Void>> 
resultMap, K key, AsyncResult<Void> result)
     {
-        Future<?> existing = futuresMap.get(key);
+        AsyncResult<Void> existing = resultMap.get(key);
         if (existing != null && !existing.isDone())
         {
-            logger.trace("Merging future {} with existing {}", future, 
existing);
-            future = FutureCombiner.allOf(ImmutableList.of(existing, future));
+            logger.trace("Merging result {} with existing {}", result, 
existing);
+            result = AsyncResults.reduce(ImmutableList.of(existing, result), 
(a, b) -> null).beginAsResult();
         }
 
-        futuresMap.put(key, future);
+        resultMap.put(key, result);
     }
 
-    private <K> void maybeClearFuture(K key)
+    private <K> void maybeClearAsyncResult(K key)
     {
         // will clear if it's done
-        getFuture(loadFutures, key);
-        getFuture(saveFutures, key);
-        getFuture(readFutures, key);
-        getFuture(writeFutures, key);
+        getAsyncResult(loadResults, key);
+        getAsyncResult(saveResults, key);
+        getAsyncResult(readResults, key);
+        getAsyncResult(writeResults, key);
     }
 
     public <K, V extends AccordState<K>> void applyAndRemoveWriteOnlyGroup(V 
instance)
@@ -312,8 +312,8 @@ public class AccordStateCache
         for (AccordState.WriteOnly<K, V> writeOnly : group.items)
         {
             writeOnly.applyChanges(instance);
-            if (!writeOnly.future().isDone())
-                mergeFuture(saveFutures, instance.key(), writeOnly.future());
+            if (!writeOnly.asyncResult().isDone())
+                mergeAsyncResult(saveResults, instance.key(), 
writeOnly.asyncResult());
         }
     }
 
@@ -402,7 +402,7 @@ public class AccordStateCache
         {
             K key = value.key();
             logger.trace("Releasing resources for {}: {}", key, value);
-            maybeClearFuture(key);
+            maybeClearAsyncResult(key);
             Node<K, V> node = (Node<K, V>) active.get(key);
             Preconditions.checkState(node != null && node.references > 0);
             Preconditions.checkState(node.value == value);
@@ -462,12 +462,12 @@ public class AccordStateCache
         public void addWriteOnly(AccordState.WriteOnly<K, V> writeOnly)
         {
             K key = writeOnly.key();
-            Preconditions.checkArgument(writeOnly.future() != null);
+            Preconditions.checkArgument(writeOnly.asyncResult() != null);
             WriteOnlyGroup<K, V> group = (WriteOnlyGroup<K, V>) 
pendingWriteOnly.computeIfAbsent(key, k -> new WriteOnlyGroup<>());
 
-            // if a load future exists for the key we're creating a write 
group for, we need to lock
+            // if a load result exists for the key we're creating a write 
group for, we need to lock
             // the group so the loading instance gets changes applied when it 
finishes loading
-            if (getLoadFuture(key) != null)
+            if (getLoadResult(key) != null)
                 group.lock();
 
             group.add(writeOnly);
@@ -495,77 +495,77 @@ public class AccordStateCache
             return group != null ? group.items.size() : 0;
         }
 
-        public Future<?> getLoadFuture(K key)
+        public AsyncResult<Void> getLoadResult(K key)
         {
-            return getFuture(loadFutures, key);
+            return getAsyncResult(loadResults, key);
         }
 
-        public void cleanupLoadFuture(K key)
+        public void cleanupLoadResult(K key)
         {
-            getLoadFuture(key);
+            getLoadResult(key);
         }
 
         @VisibleForTesting
-        public boolean hasLoadFuture(K key)
+        public boolean hasLoadResult(K key)
         {
-            return loadFutures.get(key) != null;
+            return loadResults.get(key) != null;
         }
 
-        public void setLoadFuture(K key, Future<?> future)
+        public void setLoadResult(K key, AsyncResult<Void> result)
         {
-            setFuture(loadFutures, key, future);
+            setAsyncResult(loadResults, key, result);
         }
 
-        public Future<?> getSaveFuture(K key)
+        public AsyncResult<?> getSaveResult(K key)
         {
-            return getFuture(saveFutures, key);
+            return getAsyncResult(saveResults, key);
         }
 
-        public void addSaveFuture(K key, Future<?> future)
+        public void addSaveResult(K key, AsyncResult<Void> result)
         {
-            logger.trace("Adding save future for {}: {}", key, future);
-            mergeFuture(saveFutures, key, future);
+            logger.trace("Adding save result for {}: {}", key, result);
+            mergeAsyncResult(saveResults, key, result);
         }
 
-        public void cleanupSaveFuture(K key)
+        public void cleanupSaveResult(K key)
         {
-            getSaveFuture(key);
+            getSaveResult(key);
         }
 
         @VisibleForTesting
-        public boolean hasSaveFuture(K key)
+        public boolean hasSaveResult(K key)
         {
-            return saveFutures.get(key) != null;
+            return saveResults.get(key) != null;
         }
 
-        public Future<Data> getReadFuture(K key)
+        public AsyncResult<Data> getReadResult(K key)
         {
-            return getFuture(readFutures, key);
+            return getAsyncResult(readResults, key);
         }
 
-        public void setReadFuture(K key, Future<Data> future)
+        public void setReadResult(K key, AsyncResult<Data> result)
         {
-            setFuture(readFutures, key, future);
+            setAsyncResult(readResults, key, result);
         }
 
-        public void cleanupReadFuture(K key)
+        public void cleanupReadResult(K key)
         {
-            getReadFuture(key);
+            getReadResult(key);
         }
 
-        public Future<Void> getWriteFuture(K key)
+        public AsyncResult<Void> getWriteResult(K key)
         {
-            return (Future<Void>) getFuture(writeFutures, key);
+            return getAsyncResult(writeResults, key);
         }
 
-        public void setWriteFuture(K key, Future<Void> future)
+        public void setWriteResult(K key, AsyncResult<Void> result)
         {
-            setFuture(writeFutures, key, future);
+            setAsyncResult(writeResults, key, result);
         }
 
-        public void cleanupWriteFuture(K key)
+        public void cleanupWriteResult(K key)
         {
-            getWriteFuture(key);
+            getWriteResult(key);
         }
 
         public long cacheQueries()
diff --git a/src/java/org/apache/cassandra/service/accord/ReadFuture.java 
b/src/java/org/apache/cassandra/service/accord/ReadFuture.java
deleted file mode 100644
index 11c1800383..0000000000
--- a/src/java/org/apache/cassandra/service/accord/ReadFuture.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import com.google.common.util.concurrent.FutureCallback;
-
-import accord.api.Data;
-import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
-
-public class ReadFuture implements Future<Data>
-{
-    private final Future<Data> wrappped;
-
-    public ReadFuture(Future<Data> wrappped)
-    {
-        this.wrappped = wrappped;
-    }
-
-    @Override
-    public Future<Data> await() throws InterruptedException
-    {
-        return wrappped.await();
-    }
-
-    @Override
-    public Future<Data> awaitUninterruptibly()
-    {
-        return wrappped.awaitUninterruptibly();
-    }
-
-    @Override
-    public Future<Data> awaitThrowUncheckedOnInterrupt()
-    {
-        return wrappped.awaitThrowUncheckedOnInterrupt();
-    }
-
-    @Override
-    public void rethrowIfFailed()
-    {
-        wrappped.rethrowIfFailed();
-    }
-
-    @Override
-    public Future<Data> sync() throws InterruptedException
-    {
-        return wrappped.sync();
-    }
-
-    @Override
-    public Future<Data> syncUninterruptibly()
-    {
-        return wrappped.syncUninterruptibly();
-    }
-
-    @Override
-    public Future<Data> syncThrowUncheckedOnInterrupt()
-    {
-        return wrappped.syncThrowUncheckedOnInterrupt();
-    }
-
-    @Override
-    @Deprecated
-    public boolean await(long l) throws InterruptedException
-    {
-        return wrappped.await(l);
-    }
-
-    @Override
-    @Deprecated
-    public boolean awaitUninterruptibly(long l)
-    {
-        return wrappped.awaitUninterruptibly(l);
-    }
-
-    @Override
-    public Future<Data> addCallback(BiConsumer<? super Data, Throwable> 
callback)
-    {
-        return wrappped.addCallback(callback);
-    }
-
-    @Override
-    public Future<Data> addCallback(BiConsumer<? super Data, Throwable> 
callback, Executor executor)
-    {
-        return wrappped.addCallback(callback, executor);
-    }
-
-    @Override
-    public Future<Data> addCallback(FutureCallback<? super Data> callback)
-    {
-        return wrappped.addCallback(callback);
-    }
-
-    @Override
-    public Future<Data> addCallback(FutureCallback<? super Data> callback, 
Executor executor)
-    {
-        return wrappped.addCallback(callback, executor);
-    }
-
-    @Override
-    public Future<Data> addCallback(Consumer<? super Data> onSuccess, 
Consumer<? super Throwable> onFailure)
-    {
-        return wrappped.addCallback(onSuccess, onFailure);
-    }
-
-    @Override
-    public Future<Data> addCallback(Consumer<? super Data> onSuccess, 
Consumer<? super Throwable> onFailure, Executor executor)
-    {
-        return wrappped.addCallback(onSuccess, onFailure, executor);
-    }
-
-    @Override
-    public <T> Future<T> map(Function<? super Data, ? extends T> mapper)
-    {
-        return wrappped.map(mapper);
-    }
-
-    @Override
-    public <T> Future<T> map(Function<? super Data, ? extends T> mapper, 
Executor executor)
-    {
-        return wrappped.map(mapper, executor);
-    }
-
-    @Override
-    public <T> Future<T> flatMap(Function<? super Data, ? extends Future<T>> 
flatMapper)
-    {
-        return wrappped.flatMap(flatMapper);
-    }
-
-    @Override
-    public <T> Future<T> flatMap(Function<? super Data, ? extends Future<T>> 
flatMapper, Executor executor)
-    {
-        return wrappped.flatMap(flatMapper, executor);
-    }
-
-    @Override
-    public void addListener(Runnable runnable, Executor executor)
-    {
-        wrappped.addListener(runnable, executor);
-    }
-
-    @Override
-    public void addListener(Runnable runnable)
-    {
-        wrappped.addListener(runnable);
-    }
-
-    @Override
-    public Executor notifyExecutor()
-    {
-        return wrappped.notifyExecutor();
-    }
-
-    @Override
-    public Future<Data> addListener(GenericFutureListener<? extends 
io.netty.util.concurrent.Future<? super Data>> genericFutureListener)
-    {
-        return wrappped.addListener(genericFutureListener);
-    }
-
-    @Override
-    public Future<Data> addListeners(GenericFutureListener<? extends 
io.netty.util.concurrent.Future<? super Data>>... genericFutureListeners)
-    {
-        return wrappped.addListeners(genericFutureListeners);
-    }
-
-    @Override
-    public Future<Data> removeListener(GenericFutureListener<? extends 
io.netty.util.concurrent.Future<? super Data>> genericFutureListener)
-    {
-        return wrappped.removeListener(genericFutureListener);
-    }
-
-    @Override
-    public Future<Data> removeListeners(GenericFutureListener<? extends 
io.netty.util.concurrent.Future<? super Data>>... genericFutureListeners)
-    {
-        return wrappped.removeListeners(genericFutureListeners);
-    }
-
-    @Override
-    public boolean isSuccess()
-    {
-        return wrappped.isSuccess();
-    }
-
-    @Override
-    public boolean isCancellable()
-    {
-        return wrappped.isCancellable();
-    }
-
-    @Override
-    public Throwable cause()
-    {
-        return wrappped.cause();
-    }
-
-    @Override
-    public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException
-    {
-        return wrappped.await(timeout, unit);
-    }
-
-    @Override
-    public boolean awaitUninterruptibly(long timeout, TimeUnit unit)
-    {
-        return wrappped.awaitUninterruptibly(timeout, unit);
-    }
-
-    @Override
-    public Data getNow()
-    {
-        return wrappped.getNow();
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning)
-    {
-        return wrappped.cancel(mayInterruptIfRunning);
-    }
-
-    @Override
-    public boolean isCancelled()
-    {
-        return wrappped.isCancelled();
-    }
-
-    @Override
-    public boolean isDone()
-    {
-        return wrappped.isDone();
-    }
-
-    @Override
-    public Data get() throws InterruptedException, ExecutionException
-    {
-        return wrappped.get();
-    }
-
-    @Override
-    public Data get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException
-    {
-        return wrappped.get(timeout, unit);
-    }
-
-    @Override
-    public boolean awaitUntil(long nanoTimeDeadline) throws 
InterruptedException
-    {
-        return wrappped.awaitUntil(nanoTimeDeadline);
-    }
-
-    @Override
-    public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) 
throws UncheckedInterruptedException
-    {
-        return wrappped.awaitUntilThrowUncheckedOnInterrupt(nanoTimeDeadline);
-    }
-
-    @Override
-    public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
-    {
-        return wrappped.awaitUntilUninterruptibly(nanoTimeDeadline);
-    }
-
-    @Override
-    public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) 
throws UncheckedInterruptedException
-    {
-        return wrappped.awaitThrowUncheckedOnInterrupt(time, units);
-    }
-}
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 3bbc9c2828..75c2d356a5 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -31,6 +31,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.primitives.TxnId;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.service.accord.AccordCommand;
 import org.apache.cassandra.service.accord.AccordCommandStore;
@@ -39,9 +42,8 @@ import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.AccordState;
 import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
+import static accord.utils.async.AsyncResults.ofRunnable;
 
 public class AsyncLoader
 {
@@ -60,7 +62,7 @@ public class AsyncLoader
     private final Iterable<TxnId> txnIds;
     private final Iterable<PartitionKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
     public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
     {
@@ -69,23 +71,23 @@ public class AsyncLoader
         this.keys = keys;
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         
AccordStateCache.Instance<K, V> cache,
-                                                                         
Map<K, V> context,
-                                                                         
Function<V, Future<?>> readFunction,
-                                                                         
Object callback)
+    private <K, V extends AccordState<K>> AsyncResult<Void> 
referenceAndDispatch(K key,
+                                                                               
  AccordStateCache.Instance<K, V> cache,
+                                                                               
  Map<K, V> context,
+                                                                               
  Function<V, AsyncResult<Void>> readFunction,
+                                                                               
  Object callback)
     {
         V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
+        AsyncResult<Void> result = cache.getLoadResult(key);
+        if (result != null)
         {
-            // if a load future exists for this, it must be present in the 
cache
+            // if a load result exists for this, it must be present in the 
cache
             item = cache.getOrNull(key);
             Preconditions.checkState(item != null);
             context.put(key, item);
             if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading 
for {}. ({})", item.key(), callback, item);
-            return future;
+                logger.trace("Existing load result found for {} while loading 
for {}. ({})", item.key(), callback, item);
+            return result;
         }
 
         item = cache.getOrCreate(key);
@@ -97,40 +99,40 @@ public class AsyncLoader
             return null;
         }
 
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
+        result = readFunction.apply(item);
+        cache.setLoadResult(item.key(), result);
         if (logger.isTraceEnabled())
             logger.trace("Loading new item for {} while loading for {}. ({})", 
item.key(), callback, item);
-        return future;
+        return result;
     }
 
 
-    private <K, V extends AccordState<K>> List<Future<?>> 
referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends AccordState<K>> List<AsyncChain<Void>> 
referenceAndDispatchReads(Iterable<K> keys,
                                                                                
            AccordStateCache.Instance<K, V> cache,
                                                                                
            Map<K, V> context,
-                                                                               
            Function<V, Future<?>> readFunction,
-                                                                               
            List<Future<?>> futures,
+                                                                               
            Function<V, AsyncResult<Void>> readFunction,
+                                                                               
            List<AsyncChain<Void>> results,
                                                                                
            Object callback)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, 
readFunction, callback);
-            if (future == null)
+            AsyncResult<Void> result = referenceAndDispatch(key, cache, 
context, readFunction, callback);
+            if (result == null)
                 continue;
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
 
-            futures.add(future);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+    Function<AccordCommand, AsyncResult<Void>> loadCommandFunction(Object 
callback)
     {
-        return command -> Stage.READ.submit(() -> {
+        return command -> ofRunnable(Stage.READ.executor(), () -> {
             try
             {
                 logger.trace("Starting load of {} for {}", command.txnId(), 
callback);
@@ -146,9 +148,9 @@ public class AsyncLoader
     }
 
     @VisibleForTesting
-    Function<AccordCommandsForKey, Future<?>> 
loadCommandsPerKeyFunction(Object callback)
+    Function<AccordCommandsForKey, AsyncResult<Void>> 
loadCommandsPerKeyFunction(Object callback)
     {
-        return cfk -> Stage.READ.submit(() -> {
+        return cfk -> ofRunnable(Stage.READ.executor(), () -> {
             try
             {
                 logger.trace("Starting load of {} for {}", cfk.key(), 
callback);
@@ -163,25 +165,25 @@ public class AsyncLoader
         });
     }
 
-    private Future<?> referenceAndDispatchReads(AsyncContext context, Object 
callback)
+    private AsyncResult<Void> referenceAndDispatchReads(AsyncContext context, 
Object callback)
     {
-        List<Future<?>> futures = null;
+        List<AsyncChain<Void>> results = null;
 
-        futures = referenceAndDispatchReads(txnIds,
+        results = referenceAndDispatchReads(txnIds,
                                             commandStore.commandCache(),
                                             context.commands.items,
                                             loadCommandFunction(callback),
-                                            futures,
+                                            results,
                                             callback);
 
-        futures = referenceAndDispatchReads(keys,
+        results = referenceAndDispatchReads(keys,
                                             commandStore.commandsForKeyCache(),
                                             context.commandsForKey.items,
                                             
loadCommandsPerKeyFunction(callback),
-                                            futures,
+                                            results,
                                             callback);
 
-        return futures != null ? FutureCombiner.allOf(futures) : null;
+        return results != null ? AsyncResults.reduce(results, (a, b ) -> 
null).beginAsResult() : null;
     }
 
     @VisibleForTesting
@@ -202,28 +204,28 @@ public class AsyncLoader
                 // notify any pending write only groups we're loading a full 
instance so the pending changes aren't removed
                 
txnIds.forEach(commandStore.commandCache()::lockWriteOnlyGroupIfExists);
                 
keys.forEach(commandStore.commandsForKeyCache()::lockWriteOnlyGroupIfExists);
-                readFuture = referenceAndDispatchReads(context, callback);
+                readResult = referenceAndDispatchReads(context, callback);
                 state(State.LOADING);
             case LOADING:
-                if (readFuture != null)
+                if (readResult != null)
                 {
-                    if (readFuture.isSuccess())
+                    if (readResult.isSuccess())
                     {
-                        logger.trace("Read future succeeded for {}", callback);
+                        logger.trace("Read result succeeded for {}", callback);
                         context.verifyLoaded();
-                        readFuture = null;
+                        readResult = null;
                     }
                     else
                     {
-                        logger.trace("Adding callback for read future: {}", 
callback);
-                        readFuture.addCallback(callback, 
commandStore.executor());
+                        logger.trace("Adding callback for read result: {}", 
callback);
+                        readResult.addCallback(callback, 
commandStore.executor());
                         break;
                     }
                 }
                 // apply any pending write only changes that may not have made 
it to disk in time to be loaded
-                
context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupLoadFuture);
+                
context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupLoadResult);
                 
context.commands.items.values().forEach(commandStore.commandCache()::applyAndRemoveWriteOnlyGroup);
-                
context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupLoadFuture);
+                
context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupLoadResult);
                 
context.commandsForKey.items.values().forEach(commandStore.commandsForKeyCache()::applyAndRemoveWriteOnlyGroup);
                 // apply blindly reported timestamps
                 
context.commandsForKey.items.values().forEach(AccordCommandsForKey::applyBlindWitnessedTimestamps);
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index e302b42489..b4b490fcdd 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -32,12 +33,12 @@ import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.primitives.Seekables;
 import accord.primitives.TxnId;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.service.accord.AccordCommandStore;
 import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
-public abstract class AsyncOperation<R> extends AsyncPromise<R> implements 
Runnable, Function<SafeCommandStore, R>, BiConsumer<Object, Throwable>
+public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements 
Runnable, Function<SafeCommandStore, R>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AsyncOperation.class);
 
@@ -50,6 +51,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
     enum State
     {
         INITIALIZED,
+        SUBMITTED,
         LOADING,
         RUNNING,
         SAVING,
@@ -71,6 +73,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
     private final AsyncContext context = new AsyncContext();
     private R result;
     private final String loggingId;
+    private BiConsumer<? super R, Throwable> callback;
 
     private void setLoggingIds()
     {
@@ -123,22 +126,32 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
             state = State.FAILED;
-            tryFailure(throwable);
+            fail(throwable);
         }
         else
             run();
     }
 
+    private void finish(R result)
+    {
+        Preconditions.checkArgument(state == State.COMPLETING);
+        callback.accept(result, null);
+        state = State.FINISHED;
+    }
+
+    private void fail(Throwable throwable)
+    {
+        Preconditions.checkArgument(state != State.FINISHED && state != 
State.FAILED);
+        callback.accept(null, throwable);
+        state = State.FAILED;
+    }
+
     protected void runInternal()
     {
         SafeAccordCommandStore safeStore = commandStore.safeStore(context);
@@ -147,7 +160,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
             case INITIALIZED:
                 state = State.LOADING;
             case LOADING:
-                if (!loader.load(context, this))
+                if (!loader.load(context, this::callback))
                     return;
 
                 state = State.RUNNING;
@@ -156,7 +169,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
                 state = State.SAVING;
             case SAVING:
             case AWAITING_SAVE:
-                boolean updatesPersisted = writer.save(context, this);
+                boolean updatesPersisted = writer.save(context, 
this::callback);
 
                 if (state != State.AWAITING_SAVE)
                 {
@@ -170,8 +183,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
                     return;
 
                 state = State.COMPLETING;
-                setSuccess(result);
-                state = State.FINISHED;
+                finish(result);
             case FINISHED:
                 break;
             default:
@@ -196,7 +208,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
             catch (Throwable t)
             {
                 logger.error(String.format("Operation %s failed", this), t);
-                tryFailure(t);
+                fail(t);
             }
             finally
             {
@@ -210,6 +222,14 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
         }
     }
 
+    @Override
+    public void begin(BiConsumer<? super R, Throwable> callback)
+    {
+        Preconditions.checkArgument(this.callback == null);
+        this.callback = callback;
+        commandStore.executor().submit(this);
+    }
+
     private static Iterable<PartitionKey> toPartitionKeys(Seekables<?, ?> keys)
     {
         switch (keys.domain())
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index c920a0f7bb..fb7f38687c 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -35,6 +35,9 @@ import org.slf4j.LoggerFactory;
 import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.service.accord.AccordCommand;
@@ -46,8 +49,8 @@ import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.AccordState;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.store.StoredSet;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+import static accord.utils.async.AsyncResults.ofRunnable;
 
 import static accord.primitives.Routable.Domain.Range;
 
@@ -64,7 +67,7 @@ public class AsyncWriter
     }
 
     private State state = State.INITIALIZED;
-    protected Future<?> writeFuture;
+    protected AsyncResult<Void> writeResult;
     private final AccordCommandStore commandStore;
     final AccordStateCache.Instance<TxnId, AccordCommand> commandCache;
     final AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> 
cfkCache;
@@ -81,12 +84,12 @@ public class AsyncWriter
         Mutation apply(AccordCommandStore commandStore, V state, long 
timestamp);
     }
 
-    private static <K, V extends AccordState<K>> List<Future<?>> 
dispatchWrites(AsyncContext.Group<K, V> ctxGroup,
+    private static <K, V extends AccordState<K>> List<AsyncChain<Void>> 
dispatchWrites(AsyncContext.Group<K, V> ctxGroup,
                                                                                
 AccordStateCache.Instance<K, V> cache,
                                                                                
 StateMutationFunction<K, V> mutationFunction,
                                                                                
 long timestamp,
                                                                                
 AccordCommandStore commandStore,
-                                                                               
 List<Future<?>> futures,
+                                                                               
 List<AsyncChain<Void>> results,
                                                                                
 Object callback)
     {
         for (V item : ctxGroup.items.values())
@@ -98,13 +101,13 @@ public class AsyncWriter
                 continue;
             }
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
             K key = item.key();
             Mutation mutation = mutationFunction.apply(commandStore, item, 
timestamp);
             if (logger.isTraceEnabled())
                 logger.trace("Dispatching mutation for {} for {}, {} -> {}", 
key, callback, item, mutation);
-            Future<?> future = Stage.MUTATION.submit(() -> {
+            AsyncResult<Void> result = ofRunnable(Stage.MUTATION.executor(), 
() -> {
                 try
                 {
                     if (logger.isTraceEnabled())
@@ -119,46 +122,46 @@ public class AsyncWriter
                     throw t;
                 }
             });
-            cache.addSaveFuture(item.key(), future);
-            futures.add(future);
+            cache.addSaveResult(item.key(), result);
+            results.add(result);
         }
 
         for (AccordState.WriteOnly<K, V> item : ctxGroup.writeOnly.values())
         {
             Preconditions.checkState(item.hasModifications());
-            if (futures == null) futures = new ArrayList<>();
+            if (results == null) results = new ArrayList<>();
             Mutation mutation = mutationFunction.apply(commandStore, (V) item, 
timestamp);
-            Future<?> future = Stage.MUTATION.submit((Runnable) 
mutation::apply);
-            future.addListener(() -> cache.purgeWriteOnly(item.key()), 
commandStore.executor());
-            item.future(future);
-            futures.add(future);
+            AsyncResult<Void> result = 
AsyncResults.ofRunnable(Stage.MUTATION.executor(), mutation::apply);
+            result.addListener(() -> cache.purgeWriteOnly(item.key()), 
commandStore.executor());
+            item.asyncResult(result);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
-    private Future<?> maybeDispatchWrites(AsyncContext context, Object 
callback) throws IOException
+    private AsyncResult<Void> maybeDispatchWrites(AsyncContext context, Object 
callback) throws IOException
     {
-        List<Future<?>> futures = null;
+        List<AsyncChain<Void>> results = null;
 
         long timestamp = commandStore.nextSystemTimestampMicros();
-        futures = dispatchWrites(context.commands,
+        results = dispatchWrites(context.commands,
                                  commandStore.commandCache(),
                                  AccordKeyspace::getCommandMutation,
                                  timestamp,
                                  commandStore,
-                                 futures,
+                                 results,
                                  callback);
 
-        futures = dispatchWrites(context.commandsForKey,
+        results = dispatchWrites(context.commandsForKey,
                                  commandStore.commandsForKeyCache(),
                                  AccordKeyspace::getCommandsForKeyMutation,
                                  timestamp,
                                  commandStore,
-                                 futures,
+                                 results,
                                  callback);
 
-        return futures != null ? FutureCombiner.allOf(futures) : null;
+        return results != null ? AsyncResults.reduce(results, (a, b) -> 
null).beginAsResult() : null;
     }
 
     private void denormalizeBlockedOn(AccordCommand command,
@@ -201,7 +204,7 @@ public class AsyncWriter
             return item;
 
         item = cache.getOrNull(key);
-        if (item != null && !cache.hasLoadFuture(key))
+        if (item != null && !cache.hasLoadResult(key))
         {
             ctxGroup.items.put(key, item);
             return item;
@@ -303,18 +306,18 @@ public class AsyncWriter
                     setState(State.SETUP);
                 case SETUP:
                     denormalize(context, callback);
-                    writeFuture = maybeDispatchWrites(context, callback);
+                    writeResult = maybeDispatchWrites(context, callback);
 
                     setState(State.SAVING);
                 case SAVING:
-                    if (writeFuture != null && !writeFuture.isSuccess())
+                    if (writeResult != null && !writeResult.isSuccess())
                     {
-                        logger.trace("Adding callback for write future: {}", 
callback);
-                        writeFuture.addCallback(callback, 
commandStore.executor());
+                        logger.trace("Adding callback for write result: {}", 
callback);
+                        writeResult.addCallback(callback, 
commandStore.executor());
                         break;
                     }
-                    
context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupSaveFuture);
-                    
context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupSaveFuture);
+                    
context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupSaveResult);
+                    
context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupSaveResult);
                     setState(State.FINISHED);
                 case FINISHED:
                     break;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 534f4aa262..acab7c89f7 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 import accord.api.Data;
 import accord.local.SafeCommandStore;
 import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
@@ -40,7 +42,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.Future;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
 import static 
org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
@@ -110,7 +111,7 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
         return key;
     }
 
-    public Future<Data> read(boolean isForWriteTxn, SafeCommandStore 
safeStore, Timestamp executeAt)
+    public AsyncChain<Data> read(boolean isForWriteTxn, SafeCommandStore 
safeStore, Timestamp executeAt)
     {
         SinglePartitionReadCommand command = (SinglePartitionReadCommand) 
get();
         // TODO (required, safety): before release, double check reasoning 
that this is safe
@@ -121,7 +122,7 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
         // immediately after the transaction executed, and this simplifies 
things a great deal
         int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc());
 
-        return Stage.READ.submit(() ->
+        return AsyncChains.ofCallable(Stage.READ.executor(), () ->
         {
             SinglePartitionReadCommand read = 
command.withNowInSec(nowInSeconds);
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index eeb57f13f5..5b5812aed8 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.BiConsumer;
 
 import com.google.common.collect.ImmutableList;
 
@@ -36,21 +35,18 @@ import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.Simulate;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
 import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
 import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
 import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
-import static org.apache.cassandra.utils.Simulate.With.MONITORS;
 
 public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
 {
@@ -143,51 +139,18 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
     }
 
     @Override
-    public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
safeStore, Timestamp executeAt, DataStore store)
+    public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
safeStore, Timestamp executeAt, DataStore store)
     {
-        List<Future<Data>> futures = new ArrayList<>();
-        forEachWithKey((PartitionKey) key, read -> 
futures.add(read.read(kind.isWrite(), safeStore, executeAt)));
+        List<AsyncChain<Data>> results = new ArrayList<>();
+        forEachWithKey((PartitionKey) key, read -> 
results.add(read.read(kind.isWrite(), safeStore, executeAt)));
 
-        if (futures.isEmpty())
-            return ImmediateFuture.success(new TxnData());
+        if (results.isEmpty())
+            return AsyncChains.success(new TxnData());
 
-        if (futures.size() == 1)
-            return futures.get(0);
+        if (results.size() == 1)
+            return results.get(0);
 
-        return new MultiReadFuture(futures);
-    }
-
-    @Simulate(with = MONITORS)
-    private static class MultiReadFuture extends AsyncPromise<Data> implements 
BiConsumer<Data, Throwable>
-    {
-        private Data result = null;
-        private int pending;
-
-        public MultiReadFuture(List<Future<Data>> futures)
-        {
-            pending = futures.size();
-            listen(futures);
-        }
-
-        private synchronized void listen(List<Future<Data>> futures)
-        {
-            for (int i=0, mi=futures.size(); i<mi; i++)
-                futures.get(i).addCallback(this);
-        }
-
-        @Override
-        public synchronized void accept(Data data, Throwable throwable)
-        {
-            if (isDone())
-                return;
-
-            if (throwable != null)
-                tryFailure(throwable);
-
-            result = result != null ? result.merge(data) : data;
-            if (--pending == 0)
-                trySuccess(result);
-        }
+        return AsyncChains.reduce(results, Data::merge);
     }
 
     public static final IVersionedSerializer<TxnRead> serializer = new 
IVersionedSerializer<TxnRead>()
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 94593ffc52..d6f3a71c8b 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -37,6 +37,8 @@ import accord.local.SafeCommandStore;
 import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.Writes;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.cql3.UpdateParameters;
 import org.apache.cassandra.db.Clustering;
@@ -56,7 +58,6 @@ import 
org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.*;
 
 import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
 import static 
org.apache.cassandra.service.accord.AccordSerializers.partitionUpdateSerializer;
@@ -122,11 +123,11 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
                    '}';
         }
 
-        public Future<?> write(long timestamp, int nowInSeconds)
+        public AsyncChain<Void> write(long timestamp, int nowInSeconds)
         {
             PartitionUpdate update = new PartitionUpdate.Builder(get(), 
0).updateAllTimestampAndLocalDeletionTime(timestamp, nowInSeconds).build();
             Mutation mutation = new Mutation(update);
-            return Stage.MUTATION.submit((Runnable) mutation::apply);
+            return AsyncChains.ofRunnable(Stage.MUTATION.executor(), 
mutation::apply);
         }
 
         @Override
@@ -342,7 +343,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
     }
 
     @Override
-    public Future<Void> apply(Seekable key, SafeCommandStore safeStore, 
Timestamp executeAt, DataStore store)
+    public AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, 
Timestamp executeAt, DataStore store)
     {
         AccordCommandsForKey cfk = ((SafeAccordCommandStore) 
safeStore).commandsForKey((Key)key);
         // TODO (expected, efficiency): 99.9999% of the time we can just use 
executeAt.hlc(), so can avoid bringing
@@ -352,16 +353,16 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         // TODO (low priority - do we need to compute nowInSeconds, or can we 
just use executeAt?)
         int nowInSeconds = cfk.nowInSecondsFor(executeAt, true);
 
-        List<Future<?>> futures = new ArrayList<>();
-        forEachWithKey((PartitionKey) key, write -> 
futures.add(write.write(timestamp, nowInSeconds)));
+        List<AsyncChain<Void>> results = new ArrayList<>();
+        forEachWithKey((PartitionKey) key, write -> 
results.add(write.write(timestamp, nowInSeconds)));
 
-        if (futures.isEmpty())
+        if (results.isEmpty())
             return Writes.SUCCESS;
 
-        if (futures.size() == 1)
-            return futures.get(0).flatMap(o -> Writes.SUCCESS);
+        if (results.size() == 1)
+            return results.get(0).flatMap(o -> Writes.SUCCESS);
 
-        return FutureCombiner.allOf(futures).flatMap(objects -> 
Writes.SUCCESS);
+        return AsyncChains.all(results).flatMap(objects -> Writes.SUCCESS);
     }
 
     public long estimatedSizeOnHeap()
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 6d89246593..02ccdc44ae 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.Assert;
@@ -52,6 +51,7 @@ import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandS
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static accord.utils.async.AsyncChains.awaitUninterruptibly;
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
 import static org.apache.cassandra.service.accord.AccordTestUtils.*;
 
@@ -82,10 +82,10 @@ public class AccordCommandTest
      * disable cache and make sure correct values are coming in and out of the 
accord table
      */
     @Test
-    public void basicCycleTest() throws ExecutionException, 
InterruptedException
+    public void basicCycleTest()
     {
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
-        commandStore.execute(PreLoadContext.empty(), instance -> { 
((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get();
+        awaitUninterruptibly(commandStore.execute(PreLoadContext.empty(), 
instance -> { ((SafeAccordCommandStore) 
instance).commandStore().setCacheSize(0); }));
 
 
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
@@ -98,15 +98,15 @@ public class AccordCommandTest
         PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 
1, 1, false, 1, partialTxn, fullRoute);
 
         // Check preaccept
-        commandStore.execute(preAccept, instance -> {
+        awaitUninterruptibly(commandStore.execute(preAccept, instance -> {
             PreAccept.PreAcceptReply reply = preAccept.apply(instance);
             Assert.assertTrue(reply.isOk());
             PreAccept.PreAcceptOk ok = (PreAccept.PreAcceptOk) reply;
             Assert.assertEquals(txnId, ok.witnessedAt);
             Assert.assertTrue(ok.deps.isEmpty());
-        }).get();
+        }));
 
-        commandStore.execute(preAccept, instance -> {
+        awaitUninterruptibly(commandStore.execute(preAccept, instance -> {
             Command command = instance.command(txnId);
             Assert.assertEquals(txnId, command.executeAt());
             Assert.assertEquals(Status.PreAccepted, command.status());
@@ -116,7 +116,7 @@ public class AccordCommandTest
             Assert.assertEquals(txnId, cfk.max());
             Assert.assertNotNull((cfk.byId()).get(txnId));
             Assert.assertNotNull((cfk.byExecuteAt()).get(txnId));
-        }).get();
+        }));
 
         // check accept
         TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
@@ -129,13 +129,13 @@ public class AccordCommandTest
         }
         Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, 
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps);
 
-        commandStore.execute(accept, instance -> {
+        awaitUninterruptibly(commandStore.execute(accept, instance -> {
             Accept.AcceptReply reply = accept.apply(instance);
             Assert.assertTrue(reply.isOk());
             Assert.assertTrue(reply.deps.isEmpty());
-        }).get();
+        }));
 
-        commandStore.execute(accept, instance -> {
+        awaitUninterruptibly(commandStore.execute(accept, instance -> {
             Command command = instance.command(txnId);
             Assert.assertEquals(executeAt, command.executeAt());
             Assert.assertEquals(Status.Accepted, command.status());
@@ -145,13 +145,13 @@ public class AccordCommandTest
             Assert.assertEquals(executeAt, cfk.max());
             Assert.assertNotNull((cfk.byId()).get(txnId));
             Assert.assertNotNull((cfk.byExecuteAt()).get(txnId));
-        }).get();
+        }));
 
         // check commit
         Commit commit = Commit.SerializerSupport.create(txnId, route, 1, 
executeAt, partialTxn, deps, fullRoute, null);
-        commandStore.execute(commit, commit::apply).get();
+        awaitUninterruptibly(commandStore.execute(commit, commit::apply));
 
-        commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key)), 
instance -> {
+        
awaitUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, 
Keys.of(key)), instance -> {
             Command command = instance.command(txnId);
             Assert.assertEquals(commit.executeAt, command.executeAt());
             Assert.assertTrue(command.hasBeen(Status.Committed));
@@ -160,14 +160,14 @@ public class AccordCommandTest
             AccordCommandsForKey cfk = 
((SafeAccordCommandStore)instance).commandsForKey(key(1));
             Assert.assertNotNull((cfk.byId()).get(txnId));
             Assert.assertNotNull((cfk.byExecuteAt()).get(commit.executeAt));
-        }).get();
+        }));
     }
 
     @Test
     public void computeDeps() throws Throwable
     {
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
-        commandStore.execute(PreLoadContext.empty(), instance -> { 
((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get();
+        awaitUninterruptibly(commandStore.execute(PreLoadContext.empty(), 
instance -> { ((SafeAccordCommandStore) 
instance).commandStore().setCacheSize(0); }));
 
         TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
         Txn txn = createTxn(2);
@@ -178,16 +178,16 @@ public class AccordCommandTest
         PartialTxn partialTxn = txn.slice(route.covering(), true);
         PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, 
route, 1, 1, false, 1, partialTxn, fullRoute);
 
-        commandStore.execute(preAccept1, preAccept1::apply).get();
+        awaitUninterruptibly(commandStore.execute(preAccept1, 
preAccept1::apply));
 
         // second preaccept should identify txnId1 as a dependency
         TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
         PreAccept preAccept2 = PreAccept.SerializerSupport.create(txnId2, 
route, 1, 1, false, 1, partialTxn, fullRoute);
-        commandStore.execute(preAccept2, instance -> {
+        awaitUninterruptibly(commandStore.execute(preAccept2, instance -> {
             PreAccept.PreAcceptReply reply = preAccept2.apply(instance);
             Assert.assertTrue(reply.isOk());
             PreAccept.PreAcceptOk ok = (PreAccept.PreAcceptOk) reply;
             Assert.assertTrue(ok.deps.contains(txnId1));
-        }).get();
+        }));
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
index 2cc2b3de18..a686afad7c 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java
@@ -21,12 +21,13 @@ package org.apache.cassandra.service.accord;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 
 public class AccordStateCacheTest
 {
@@ -248,8 +249,8 @@ public class AccordStateCacheTest
 
         assertCacheState(cache, 0, 4, DEFAULT_NODE_SIZE * 4);
 
-        AsyncPromise<Void> saveFuture = new AsyncPromise<>();
-        instance.addSaveFuture(0, saveFuture);
+        AsyncResult<Void> saveFuture = AsyncResults.settable();
+        instance.addSaveResult(0, saveFuture);
         cache.setMaxSize(0);
 
         // all should have been evicted except 0
@@ -269,7 +270,7 @@ public class AccordStateCacheTest
 
         static class WriteOnly extends SetItem implements 
AccordState.WriteOnly<Integer, SetItem>
         {
-            AsyncPromise<?> promise = null;
+            AsyncResult.Settable<Void> promise = null;
             final Set<Integer> added = new HashSet<>();
             final Set<Integer> remove = new HashSet<>();
 
@@ -279,14 +280,14 @@ public class AccordStateCacheTest
             }
 
             @Override
-            public void future(Future<?> future)
+            public void asyncResult(AsyncResult<Void> notifier)
             {
-                Assert.assertTrue(future instanceof AsyncPromise);
-                this.promise = (AsyncPromise<?>) future;
+                Preconditions.checkArgument(notifier instanceof 
AsyncResult.Settable);
+                this.promise = (AsyncResult.Settable<Void>) notifier;
             }
 
             @Override
-            public Future<?> future()
+            public AsyncResult<Void> asyncResult()
             {
                 return promise;
             }
@@ -353,17 +354,17 @@ public class AccordStateCacheTest
 
         SetItem.WriteOnly writeOnly1 = new SetItem.WriteOnly(5);
         writeOnly1.added.addAll(ImmutableSet.of(4, 5));
-        writeOnly1.future(new AsyncPromise<>());
+        writeOnly1.asyncResult(AsyncResults.settable());
         instance.addWriteOnly(writeOnly1);
         Assert.assertEquals(1, instance.pendingWriteOnlyOperations(5));
 
         SetItem.WriteOnly writeOnly2 = new SetItem.WriteOnly(5);
         writeOnly2.remove.addAll(ImmutableSet.of(2, 4));
-        writeOnly2.future(new AsyncPromise<>());
+        writeOnly2.asyncResult(AsyncResults.settable());
         instance.addWriteOnly(writeOnly2);
         Assert.assertEquals(2, instance.pendingWriteOnlyOperations(5));
 
-        Assert.assertNull(instance.getSaveFuture(5));
+        Assert.assertNull(instance.getSaveResult(5));
         Assert.assertFalse(instance.writeOnlyGroupIsLocked(5));
 
         instance.lockWriteOnlyGroupIfExists(5);
@@ -377,7 +378,7 @@ public class AccordStateCacheTest
 
         // write only futures should have been merged and promoted to normal 
save futures, which would
         // prevent the cached object from being purged until they were 
completed
-        Future<?> saveFuture = instance.getSaveFuture(5);
+        AsyncResult<?> saveFuture = instance.getSaveResult(5);
         Assert.assertNotNull(saveFuture);
         Assert.assertFalse(saveFuture.isDone());
         Assert.assertFalse(instance.canEvict(5));
@@ -402,7 +403,7 @@ public class AccordStateCacheTest
         {
             SetItem.WriteOnly item = new SetItem.WriteOnly(5);
             item.added.add(i);
-            item.future(new AsyncPromise<>());
+            item.asyncResult(AsyncResults.settable());
             instance.addWriteOnly(item);
             writeOnly[i] = item;
         }
@@ -433,7 +434,7 @@ public class AccordStateCacheTest
 
         SetItem.WriteOnly item = new SetItem.WriteOnly(5);
         item.added.add(0);
-        item.future(new AsyncPromise<>());
+        item.asyncResult(AsyncResults.settable());
         instance.addWriteOnly(item);
 
         instance.lockWriteOnlyGroupIfExists(5);
@@ -452,8 +453,8 @@ public class AccordStateCacheTest
         AccordStateCache cache = new AccordStateCache(500);
         AccordStateCache.Instance<Integer, SetItem> instance = 
cache.instance(Integer.class, SetItem.class, SetItem::new);
 
-        AsyncPromise<?> loadfuture = new AsyncPromise<>();
-        instance.setLoadFuture(5, loadfuture);
+        AsyncResult<Void> loadfuture = AsyncResults.settable();
+        instance.setLoadResult(5, loadfuture);
 
         Assert.assertFalse(instance.writeOnlyGroupIsLocked(5));
         Assert.assertEquals(0, instance.pendingWriteOnlyOperations(5));
@@ -461,7 +462,7 @@ public class AccordStateCacheTest
         // adding a write only object should immediately lock the group, since 
there's an existing load future
         SetItem.WriteOnly item = new SetItem.WriteOnly(5);
         item.added.add(0);
-        item.future(new AsyncPromise<>());
+        item.asyncResult(AsyncResults.settable());
         instance.addWriteOnly(item);
 
         Assert.assertTrue(instance.writeOnlyGroupIsLocked(5));
@@ -474,12 +475,12 @@ public class AccordStateCacheTest
     {
         AccordStateCache cache = new AccordStateCache(500);
         AccordStateCache.Instance<Integer, SetItem> instance = 
cache.instance(Integer.class, SetItem.class, SetItem::new);
-        AsyncPromise<?> promise1 = new AsyncPromise<>();
-        AsyncPromise<?> promise2 = new AsyncPromise<>();
-        instance.addSaveFuture(5, promise1);
-        instance.addSaveFuture(5, promise2);
+        AsyncResult.Settable<Void> promise1 = AsyncResults.settable();
+        AsyncResult.Settable<Void> promise2 = AsyncResults.settable();
+        instance.addSaveResult(5, promise1);
+        instance.addSaveResult(5, promise2);
 
-        Future<?> future = instance.getSaveFuture(5);
+        AsyncResult<?> future = instance.getSaveResult(5);
         Assert.assertNotSame(future, promise1);
         Assert.assertNotSame(future, promise2);
 
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 531b513fa6..0c457ae067 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -54,6 +54,7 @@ import accord.primitives.Unseekables;
 import accord.primitives.Writes;
 import accord.topology.Shard;
 import accord.topology.Topology;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
@@ -68,6 +69,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static accord.primitives.Routable.Domain.Key;
+import static accord.utils.async.AsyncChains.awaitUninterruptibly;
 import static java.lang.String.format;
 
 public class AccordTestUtils
@@ -113,7 +115,7 @@ public class AccordTestUtils
     public static void processCommandResult(AccordCommandStore commandStore, 
Command command) throws Throwable
     {
 
-        
commandStore.execute(PreLoadContext.contextFor(Collections.emptyList(), 
command.partialTxn().keys()),
+        
awaitUninterruptibly(commandStore.execute(PreLoadContext.contextFor(Collections.emptyList(),
 command.partialTxn().keys()),
                                        instance -> {
             PartialTxn txn = command.partialTxn();
             TxnRead read = (TxnRead) txn.read();
@@ -121,7 +123,7 @@ public class AccordTestUtils
                                 .map(key -> {
                                     try
                                     {
-                                        return read.read(key, 
command.txnId().rw(), instance, command.executeAt(), null).get();
+                                        return 
AsyncChains.getBlocking(read.read(key, command.txnId().rw(), instance, 
command.executeAt(), null));
                                     }
                                     catch (InterruptedException e)
                                     {
@@ -136,7 +138,7 @@ public class AccordTestUtils
             Write write = txn.update().apply(readData);
             ((AccordCommand)command).setWrites(new Writes(command.executeAt(), 
(Keys)txn.keys(), write));
             
((AccordCommand)command).setResult(txn.query().compute(command.txnId(), 
readData, txn.read(), txn.update()));
-        }).get();
+        }));
     }
 
     public static Txn createTxn(String query)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index c5bc97984d..3cdfceda18 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -33,6 +33,8 @@ import org.junit.Test;
 import accord.local.Status;
 import accord.primitives.PartialTxn;
 import accord.primitives.TxnId;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
@@ -43,7 +45,6 @@ import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static java.util.Collections.singleton;
@@ -207,8 +208,8 @@ public class AsyncLoaderTest
         AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
singleton(key));
 
         // since there's a read future associated with the txnId, we'll wait 
for it to load
-        AsyncPromise<Void> readFuture = new AsyncPromise<>();
-        commandCache.setLoadFuture(command.txnId(), readFuture);
+        AsyncResult.Settable<Void> readFuture = AsyncResults.settable();
+        commandCache.setLoadResult(command.txnId(), readFuture);
 
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         commandStore.executeBlocking(() -> {
@@ -254,12 +255,12 @@ public class AsyncLoaderTest
             AccordStateCache.Instance<TxnId, AccordCommand> cache = 
commandStore.commandCache();
             AccordCommand.WriteOnly writeOnly1 = new 
AccordCommand.WriteOnly(txnId);
             writeOnly1.blockingApplyOn.blindAdd(blockApply);
-            writeOnly1.future(new AsyncPromise<>());
+            writeOnly1.asyncResult(AsyncResults.settable());
             cache.addWriteOnly(writeOnly1);
 
             AccordCommand.WriteOnly writeOnly2 = new 
AccordCommand.WriteOnly(txnId);
             writeOnly2.blockingCommitOn.blindAdd(blockCommit);
-            writeOnly2.future(new AsyncPromise<>());
+            writeOnly2.asyncResult(AsyncResults.settable());
             cache.addWriteOnly(writeOnly2);
 
             AsyncContext context = new AsyncContext();
@@ -286,9 +287,9 @@ public class AsyncLoaderTest
         TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
         TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
 
-        AsyncPromise<Void> promise1 = new AsyncPromise<>();
-        AsyncPromise<Void> promise2 = new AsyncPromise<>();
-        AsyncPromise<Void> callback = new AsyncPromise<>();
+        AsyncResult.Settable<Void> promise1 = AsyncResults.settable();
+        AsyncResult.Settable<Void> promise2 = AsyncResults.settable();
+        AsyncResult.Settable<Void> callback = AsyncResults.settable();
         RuntimeException failure = new RuntimeException();
 
         execute(commandStore, () -> {
@@ -296,7 +297,7 @@ public class AsyncLoaderTest
             AtomicInteger loadCalls = new AtomicInteger();
             AsyncLoader loader = new AsyncLoader(commandStore, 
ImmutableList.of(txnId1, txnId2), Collections.emptyList()){
                 @Override
-                Function<AccordCommand, Future<?>> loadCommandFunction(Object 
callback)
+                Function<AccordCommand, AsyncResult<Void>> 
loadCommandFunction(Object callback)
                 {
                     return cmd -> {
                         TxnId txnId = cmd.txnId();
@@ -321,6 +322,6 @@ public class AsyncLoaderTest
         });
 
         promise1.tryFailure(failure);
-        callback.get();
+        AsyncResults.awaitUninterruptibly(callback);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index 2a53d08ae3..2f3011128a 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -54,6 +53,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static accord.local.PreLoadContext.contextFor;
+import static accord.utils.async.AsyncChains.awaitUninterruptibly;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -94,10 +94,10 @@ public class AsyncOperationTest
         Txn txn = createTxn((int)clock.incrementAndGet());
         PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
 
-        commandStore.execute(contextFor(txnId), instance -> {
+        awaitUninterruptibly(commandStore.execute(contextFor(txnId), instance 
-> {
             Command command = instance.ifPresent(txnId);
             Assert.assertNull(command);
-        }).get();
+        }));
 
         UntypedResultSet result = AccordKeyspace.loadCommandRow(commandStore, 
txnId);
         Assert.assertTrue(result.isEmpty());
@@ -110,10 +110,10 @@ public class AsyncOperationTest
         Txn txn = createTxn((int)clock.incrementAndGet());
         PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
 
-        commandStore.execute(contextFor(Collections.emptyList(), 
Keys.of(key)),instance -> {
+        
awaitUninterruptibly(commandStore.execute(contextFor(Collections.emptyList(), 
Keys.of(key)),instance -> {
             AccordCommandsForKey cfk = 
((SafeAccordCommandStore)instance).maybeCommandsForKey(key);
             Assert.assertNull(cfk);
-        }).get();
+        }));
 
         int nowInSeconds = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = 
AccordKeyspace.getCommandsForKeyRead(commandStore, key, nowInSeconds);
@@ -142,10 +142,10 @@ public class AsyncOperationTest
 
     private static void assertFutureState(AccordStateCache.Instance<TxnId, 
AccordCommand> cache, TxnId txnId, boolean expectLoadFuture, boolean 
expectSaveFuture)
     {
-        if (cache.hasLoadFuture(txnId) != expectLoadFuture)
+        if (cache.hasLoadResult(txnId) != expectLoadFuture)
             throw new AssertionError(expectLoadFuture ? "Load future 
unexpectedly not found for " + txnId
                                                       : "Unexpectedly found 
load future for " + txnId);
-        if (cache.hasSaveFuture(txnId) != expectSaveFuture)
+        if (cache.hasSaveResult(txnId) != expectSaveFuture)
             throw new AssertionError(expectSaveFuture ? "Save future 
unexpectedly not found for " + txnId
                                                       : "Unexpectedly found 
save future for " + txnId);
 
@@ -220,6 +220,6 @@ public class AsyncOperationTest
 
         commandStore.executor().submit(operation);
 
-        Futures.getUnchecked(operation);
+        awaitUninterruptibly(operation);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to