This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit fa10af834e750887be53c33d3eebb6d7d3606c1b Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Fri Jan 27 09:41:59 2023 -0800 add AsyncChain implementations and tests Patch by Blake Eggleston; Reviewed by David Capwell & Benedict Elliott Smith for CASSANDRA-18004 --- .../cassandra/service/accord/AccordCommand.java | 74 +++--- .../service/accord/AccordCommandStore.java | 18 +- .../service/accord/AccordCommandsForKey.java | 14 +- .../cassandra/service/accord/AccordService.java | 7 +- .../cassandra/service/accord/AccordState.java | 6 +- .../cassandra/service/accord/AccordStateCache.java | 128 ++++----- .../cassandra/service/accord/ReadFuture.java | 292 --------------------- .../service/accord/async/AsyncLoader.java | 92 +++---- .../service/accord/async/AsyncOperation.java | 46 +++- .../service/accord/async/AsyncWriter.java | 63 ++--- .../cassandra/service/accord/txn/TxnNamedRead.java | 7 +- .../cassandra/service/accord/txn/TxnRead.java | 57 +--- .../cassandra/service/accord/txn/TxnWrite.java | 21 +- .../service/accord/AccordCommandTest.java | 36 +-- .../service/accord/AccordStateCacheTest.java | 47 ++-- .../cassandra/service/accord/AccordTestUtils.java | 8 +- .../service/accord/async/AsyncLoaderTest.java | 21 +- .../service/accord/async/AsyncOperationTest.java | 16 +- 18 files changed, 332 insertions(+), 621 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java index 8020b29ac7..88b39922ce 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java @@ -58,14 +58,15 @@ import accord.primitives.Writes; import accord.utils.DeterministicIdentitySet; import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; import org.apache.cassandra.service.accord.api.PartitionKey; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.service.accord.async.AsyncContext; import org.apache.cassandra.service.accord.store.StoredNavigableMap; import org.apache.cassandra.service.accord.store.StoredSet; import org.apache.cassandra.service.accord.store.StoredValue; import org.apache.cassandra.service.accord.txn.TxnData; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; import static accord.local.Status.Durability.Local; import static accord.local.Status.Durability.NotDurable; @@ -83,7 +84,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> public static class WriteOnly extends AccordCommand implements AccordState.WriteOnly<TxnId, AccordCommand> { - private Future<?> future = null; + private AsyncResult<Void> asyncResult = null; public WriteOnly(TxnId txnId) { @@ -91,16 +92,16 @@ public class AccordCommand extends Command implements AccordState<TxnId> } @Override - public void future(Future<?> future) + public void asyncResult(AsyncResult<Void> notifier) { - Preconditions.checkArgument(this.future == null); - this.future = future; + Preconditions.checkArgument(this.asyncResult == null); + this.asyncResult = notifier; } @Override - public Future<?> future() + public AsyncResult<Void> asyncResult() { - return future; + return asyncResult; } @Override @@ -618,7 +619,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> protected void postApply(SafeCommandStore safeStore) { AccordStateCache.Instance<TxnId, AccordCommand> cache = ((SafeAccordCommandStore) safeStore).commandStore().commandCache(); - cache.cleanupWriteFuture(txnId); + cache.cleanupWriteResult(txnId); super.postApply(safeStore); } @@ -640,58 +641,69 @@ public class AccordCommand extends Command implements AccordState<TxnId> return true; } - private Future<Void> applyWithCorrectScope(CommandStore unsafeStore) + private AsyncResult<Void> applyWithCorrectScope(CommandStore unsafeStore) { TxnId txnId = txnId(); - AsyncPromise<Void> promise = new AsyncPromise<>(); + AsyncResult.Settable<Void> result = AsyncResults.settable(); unsafeStore.execute(this, safeStore -> { AccordCommand command = (AccordCommand) safeStore.command(txnId); - command.apply(safeStore, false).addCallback((v, throwable) -> { - if (throwable != null) - promise.tryFailure(throwable); - else - promise.trySuccess(null); - }); + command.applyChain(safeStore, false).begin(result.settingCallback()); + }).begin((unused, throwable) -> { + if (throwable != null) + result.tryFailure(throwable); }); - return promise; + return result; } - private Future<Void> apply(SafeCommandStore safeStore, boolean canReschedule) + private AsyncChain<Void> applyChain(SafeCommandStore safeStore, boolean canReschedule) { AccordStateCache.Instance<TxnId, AccordCommand> cache = ((SafeAccordCommandStore) safeStore).commandStore().commandCache(); - Future<Void> future = cache.getWriteFuture(txnId); - if (future != null) - return future; + AsyncResult<Void> writeResult = cache.getWriteResult(txnId); + if (writeResult != null) + return writeResult; // this can be called via a listener callback, in which case we won't // have the appropriate commandsForKey in scope, so start a new operation // with the correct scope and notify the caller when that completes if (!canApplyWithCurrentScope(safeStore)) { + return writeResult; + } + + if (canApplyWithCurrentScope(safeStore)) + { + AsyncChain<Void> chain = super.applyChain(safeStore); + writeResult = AsyncResults.forChain(chain); + } + else + { + // this can be called via a listener callback, in which case we won't + // have the appropriate commandsForKey in scope, so start a new operation + // with the correct scope and notify the caller when that completes Preconditions.checkArgument(canReschedule); return applyWithCorrectScope(safeStore.commandStore()); } + cache.setWriteResult(txnId, writeResult); - future = super.apply(safeStore); - cache.setWriteFuture(txnId, future); - return future; + return writeResult; } @Override - public Future<Void> apply(SafeCommandStore safeStore) + protected AsyncChain<Void> applyChain(SafeCommandStore safeStore) { - return apply(safeStore, true); + + return applyChain(safeStore, true); } @Override - public Future<Data> read(SafeCommandStore safeStore) + public AsyncChain<Data> read(SafeCommandStore safeStore) { AccordStateCache.Instance<TxnId, AccordCommand> cache = ((SafeAccordCommandStore) safeStore).commandStore().commandCache(); - Future<Data> future = cache.getReadFuture(txnId); + AsyncResult<Data> future = cache.getReadResult(txnId); if (future != null) return future; - future = super.read(safeStore); - cache.setReadFuture(txnId, future); + future = AsyncResults.forChain(super.read(safeStore)); + cache.setReadResult(txnId, future); return future; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 87c9c752d9..b7c6e9754e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -54,10 +54,10 @@ import accord.primitives.AbstractKeys; import accord.primitives.TxnId; import accord.utils.Invariants; import org.apache.cassandra.service.accord.api.PartitionKey; +import accord.utils.async.AsyncChain; import org.apache.cassandra.service.accord.async.AsyncContext; import org.apache.cassandra.service.accord.async.AsyncOperation; import org.apache.cassandra.utils.Clock; -import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; @@ -271,13 +271,13 @@ public class AccordCommandStore extends CommandStore } @Override - public Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) + public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { return AccordCommandStore.this.execute(context, consumer); } @Override - public <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function) + public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function) { return AccordCommandStore.this.submit(context, function); } @@ -445,11 +445,9 @@ public class AccordCommandStore extends CommandStore } @Override - public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) + public <T> AsyncChain<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) { - AsyncOperation<T> operation = AsyncOperation.create(this, loadCtx, function); - executor.execute(operation); - return operation; + return AsyncOperation.create(this, loadCtx, function); } @Override @@ -459,11 +457,9 @@ public class AccordCommandStore extends CommandStore } @Override - public Future<Void> execute(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer) + public AsyncChain<Void> execute(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer) { - AsyncOperation<Void> operation = AsyncOperation.create(this, preLoadContext, consumer); - executor.execute(operation); - return operation; + return AsyncOperation.create(this, preLoadContext, consumer); } public void executeBlocking(Runnable runnable) diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java index b93dc27763..6b3ba9856e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java @@ -43,13 +43,13 @@ import accord.local.Status; import accord.primitives.Timestamp; import accord.primitives.TxnId; import org.apache.cassandra.service.accord.api.PartitionKey; +import accord.utils.async.AsyncResult; import org.apache.cassandra.service.accord.store.StoredLong; import org.apache.cassandra.service.accord.store.StoredNavigableMap; import org.apache.cassandra.service.accord.store.StoredSet; import org.apache.cassandra.service.accord.store.StoredValue; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.Future; import org.assertj.core.util.VisibleForTesting; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; @@ -75,7 +75,7 @@ public class AccordCommandsForKey extends CommandsForKey implements AccordState< public static class WriteOnly extends AccordCommandsForKey implements AccordState.WriteOnly<PartitionKey, AccordCommandsForKey> { - private Future<?> future = null; + private AsyncResult<Void> result = null; public WriteOnly(AccordCommandStore commandStore, PartitionKey key) { @@ -83,17 +83,17 @@ public class AccordCommandsForKey extends CommandsForKey implements AccordState< } @Override - public void future(Future<?> future) + public void asyncResult(AsyncResult<Void> result) { - Preconditions.checkArgument(this.future == null); - this.future = future; + Preconditions.checkArgument(this.result == null); + this.result = result; } @Override - public Future<?> future() + public AsyncResult<Void> asyncResult() { - return future; + return result; } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index c308afa2de..28f2d878c6 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -37,6 +37,8 @@ import accord.messages.Request; import accord.primitives.Txn; import accord.topology.TopologyManager; import org.apache.cassandra.concurrent.Shutdownable; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -51,7 +53,6 @@ import org.apache.cassandra.service.accord.txn.TxnData; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps; @@ -185,8 +186,8 @@ public class AccordService implements IAccordService, Shutdownable try { metrics.keySize.update(txn.keys().size()); - Future<Result> future = node.coordinate(txn); - Result result = future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + AsyncResult<Result> asyncResult = node.coordinate(txn); + Result result = AsyncResults.getBlocking(asyncResult, DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); return (TxnData) result; } catch (ExecutionException e) diff --git a/src/java/org/apache/cassandra/service/accord/AccordState.java b/src/java/org/apache/cassandra/service/accord/AccordState.java index 2f5a9dc68c..0e1a224cd1 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordState.java +++ b/src/java/org/apache/cassandra/service/accord/AccordState.java @@ -21,9 +21,9 @@ package org.apache.cassandra.service.accord; import java.util.function.BiConsumer; import java.util.function.Function; +import accord.utils.async.AsyncResult; import org.apache.cassandra.service.accord.store.StoredNavigableMap; import org.apache.cassandra.service.accord.store.StoredSet; -import org.apache.cassandra.utils.concurrent.Future; public interface AccordState<K> { @@ -69,9 +69,9 @@ public interface AccordState<K> return ReadWrite.WRITE_ONLY; } - void future(Future<?> future); + void asyncResult(AsyncResult<Void> notifier); - Future<?> future(); + AsyncResult<Void> asyncResult(); /** * Apply the write only changes to the full instance diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java index 37e7ba17cf..5993b73d22 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java +++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java @@ -35,9 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Data; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; /** * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types. @@ -85,7 +85,7 @@ public class AccordStateCache AccordState.WriteOnly<K, V> item = items.get(0); // we can't remove items out of order, so if we encounter a write is still pending, we stop - if (item.future() == null || !item.future().isDone()) + if (item.asyncResult() == null || !item.asyncResult().isDone()) break; items.remove(0); @@ -154,11 +154,11 @@ public class AccordStateCache private final Map<Object, WriteOnlyGroup<?, ?>> pendingWriteOnly = new HashMap<>(); private final Set<Instance<?, ?>> instances = new HashSet<>(); - private final NamedMap<Object, Future<?>> loadFutures = new NamedMap<>("loadFutures"); - private final NamedMap<Object, Future<?>> saveFutures = new NamedMap<>("saveFutures"); + private final NamedMap<Object, AsyncResult<Void>> loadResults = new NamedMap<>("loadResults"); + private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults"); - private final NamedMap<Object, Future<Data>> readFutures = new NamedMap<>("readFutures"); - private final NamedMap<Object, Future<?>> writeFutures = new NamedMap<>("writeFutures"); + private final NamedMap<Object, AsyncResult<Data>> readResults = new NamedMap<>("readResults"); + private final NamedMap<Object, AsyncResult<Void>> writeResults = new NamedMap<>("writeResults"); Node<?, ?> head; Node<?, ?> tail; @@ -227,13 +227,13 @@ public class AccordStateCache bytesCached += node.estimatedSizeOnHeapDelta(); } - // don't evict if there's an outstanding save future. If an item is evicted then reloaded + // don't evict if there's an outstanding save result. If an item is evicted then reloaded // before it's mutation is applied, out of date info will be loaded private boolean canEvict(Object key) { - // getFuture only returns a future if it is running, so don't need to check if its still running - Future<?> future = getFuture(saveFutures, key); - return future == null; + // getResult only returns a result if it is running, so don't need to check if its still running + AsyncResult<?> result = getAsyncResult(saveResults, key); + return result == null || result.isDone(); } private void maybeEvict() @@ -248,7 +248,7 @@ public class AccordStateCache current = current.prev; // if there are any dangling write only groups, apply them and - // move their futures into write futures so we don't evict + // move their results into write results so we don't evict applyAndRemoveWriteOnlyGroup(evict.value); if (!canEvict(evict.key())) continue; @@ -260,9 +260,9 @@ public class AccordStateCache } } - private static <K, F extends Future<?>> F getFuture(NamedMap<Object, F> futuresMap, K key) + private static <K, F extends AsyncResult<?>> F getAsyncResult(NamedMap<Object, F> resultMap, K key) { - F r = futuresMap.get(key); + F r = resultMap.get(key); if (r == null) return null; @@ -270,36 +270,36 @@ public class AccordStateCache return r; if (logger.isTraceEnabled()) - logger.trace("Clearing future for {} from {}: {}", key, futuresMap.name, r); - futuresMap.remove(key); + logger.trace("Clearing result for {} from {}: {}", key, resultMap.name, r); + resultMap.remove(key); return null; } - private static <K, F extends Future<?>> void setFuture(Map<Object, F> futuresMap, K key, F future) + private static <K, F extends AsyncResult<?>> void setAsyncResult(Map<Object, F> resultsMap, K key, F result) { - Preconditions.checkState(!futuresMap.containsKey(key)); - futuresMap.put(key, future); + Preconditions.checkState(!resultsMap.containsKey(key)); + resultsMap.put(key, result); } - private static <K> void mergeFuture(Map<Object, Future<?>> futuresMap, K key, Future<?> future) + private static <K> void mergeAsyncResult(Map<Object, AsyncResult<Void>> resultMap, K key, AsyncResult<Void> result) { - Future<?> existing = futuresMap.get(key); + AsyncResult<Void> existing = resultMap.get(key); if (existing != null && !existing.isDone()) { - logger.trace("Merging future {} with existing {}", future, existing); - future = FutureCombiner.allOf(ImmutableList.of(existing, future)); + logger.trace("Merging result {} with existing {}", result, existing); + result = AsyncResults.reduce(ImmutableList.of(existing, result), (a, b) -> null).beginAsResult(); } - futuresMap.put(key, future); + resultMap.put(key, result); } - private <K> void maybeClearFuture(K key) + private <K> void maybeClearAsyncResult(K key) { // will clear if it's done - getFuture(loadFutures, key); - getFuture(saveFutures, key); - getFuture(readFutures, key); - getFuture(writeFutures, key); + getAsyncResult(loadResults, key); + getAsyncResult(saveResults, key); + getAsyncResult(readResults, key); + getAsyncResult(writeResults, key); } public <K, V extends AccordState<K>> void applyAndRemoveWriteOnlyGroup(V instance) @@ -312,8 +312,8 @@ public class AccordStateCache for (AccordState.WriteOnly<K, V> writeOnly : group.items) { writeOnly.applyChanges(instance); - if (!writeOnly.future().isDone()) - mergeFuture(saveFutures, instance.key(), writeOnly.future()); + if (!writeOnly.asyncResult().isDone()) + mergeAsyncResult(saveResults, instance.key(), writeOnly.asyncResult()); } } @@ -402,7 +402,7 @@ public class AccordStateCache { K key = value.key(); logger.trace("Releasing resources for {}: {}", key, value); - maybeClearFuture(key); + maybeClearAsyncResult(key); Node<K, V> node = (Node<K, V>) active.get(key); Preconditions.checkState(node != null && node.references > 0); Preconditions.checkState(node.value == value); @@ -462,12 +462,12 @@ public class AccordStateCache public void addWriteOnly(AccordState.WriteOnly<K, V> writeOnly) { K key = writeOnly.key(); - Preconditions.checkArgument(writeOnly.future() != null); + Preconditions.checkArgument(writeOnly.asyncResult() != null); WriteOnlyGroup<K, V> group = (WriteOnlyGroup<K, V>) pendingWriteOnly.computeIfAbsent(key, k -> new WriteOnlyGroup<>()); - // if a load future exists for the key we're creating a write group for, we need to lock + // if a load result exists for the key we're creating a write group for, we need to lock // the group so the loading instance gets changes applied when it finishes loading - if (getLoadFuture(key) != null) + if (getLoadResult(key) != null) group.lock(); group.add(writeOnly); @@ -495,77 +495,77 @@ public class AccordStateCache return group != null ? group.items.size() : 0; } - public Future<?> getLoadFuture(K key) + public AsyncResult<Void> getLoadResult(K key) { - return getFuture(loadFutures, key); + return getAsyncResult(loadResults, key); } - public void cleanupLoadFuture(K key) + public void cleanupLoadResult(K key) { - getLoadFuture(key); + getLoadResult(key); } @VisibleForTesting - public boolean hasLoadFuture(K key) + public boolean hasLoadResult(K key) { - return loadFutures.get(key) != null; + return loadResults.get(key) != null; } - public void setLoadFuture(K key, Future<?> future) + public void setLoadResult(K key, AsyncResult<Void> result) { - setFuture(loadFutures, key, future); + setAsyncResult(loadResults, key, result); } - public Future<?> getSaveFuture(K key) + public AsyncResult<?> getSaveResult(K key) { - return getFuture(saveFutures, key); + return getAsyncResult(saveResults, key); } - public void addSaveFuture(K key, Future<?> future) + public void addSaveResult(K key, AsyncResult<Void> result) { - logger.trace("Adding save future for {}: {}", key, future); - mergeFuture(saveFutures, key, future); + logger.trace("Adding save result for {}: {}", key, result); + mergeAsyncResult(saveResults, key, result); } - public void cleanupSaveFuture(K key) + public void cleanupSaveResult(K key) { - getSaveFuture(key); + getSaveResult(key); } @VisibleForTesting - public boolean hasSaveFuture(K key) + public boolean hasSaveResult(K key) { - return saveFutures.get(key) != null; + return saveResults.get(key) != null; } - public Future<Data> getReadFuture(K key) + public AsyncResult<Data> getReadResult(K key) { - return getFuture(readFutures, key); + return getAsyncResult(readResults, key); } - public void setReadFuture(K key, Future<Data> future) + public void setReadResult(K key, AsyncResult<Data> result) { - setFuture(readFutures, key, future); + setAsyncResult(readResults, key, result); } - public void cleanupReadFuture(K key) + public void cleanupReadResult(K key) { - getReadFuture(key); + getReadResult(key); } - public Future<Void> getWriteFuture(K key) + public AsyncResult<Void> getWriteResult(K key) { - return (Future<Void>) getFuture(writeFutures, key); + return getAsyncResult(writeResults, key); } - public void setWriteFuture(K key, Future<Void> future) + public void setWriteResult(K key, AsyncResult<Void> result) { - setFuture(writeFutures, key, future); + setAsyncResult(writeResults, key, result); } - public void cleanupWriteFuture(K key) + public void cleanupWriteResult(K key) { - getWriteFuture(key); + getWriteResult(key); } public long cacheQueries() diff --git a/src/java/org/apache/cassandra/service/accord/ReadFuture.java b/src/java/org/apache/cassandra/service/accord/ReadFuture.java deleted file mode 100644 index 11c1800383..0000000000 --- a/src/java/org/apache/cassandra/service/accord/ReadFuture.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.accord; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; - -import com.google.common.util.concurrent.FutureCallback; - -import accord.api.Data; -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; - -public class ReadFuture implements Future<Data> -{ - private final Future<Data> wrappped; - - public ReadFuture(Future<Data> wrappped) - { - this.wrappped = wrappped; - } - - @Override - public Future<Data> await() throws InterruptedException - { - return wrappped.await(); - } - - @Override - public Future<Data> awaitUninterruptibly() - { - return wrappped.awaitUninterruptibly(); - } - - @Override - public Future<Data> awaitThrowUncheckedOnInterrupt() - { - return wrappped.awaitThrowUncheckedOnInterrupt(); - } - - @Override - public void rethrowIfFailed() - { - wrappped.rethrowIfFailed(); - } - - @Override - public Future<Data> sync() throws InterruptedException - { - return wrappped.sync(); - } - - @Override - public Future<Data> syncUninterruptibly() - { - return wrappped.syncUninterruptibly(); - } - - @Override - public Future<Data> syncThrowUncheckedOnInterrupt() - { - return wrappped.syncThrowUncheckedOnInterrupt(); - } - - @Override - @Deprecated - public boolean await(long l) throws InterruptedException - { - return wrappped.await(l); - } - - @Override - @Deprecated - public boolean awaitUninterruptibly(long l) - { - return wrappped.awaitUninterruptibly(l); - } - - @Override - public Future<Data> addCallback(BiConsumer<? super Data, Throwable> callback) - { - return wrappped.addCallback(callback); - } - - @Override - public Future<Data> addCallback(BiConsumer<? super Data, Throwable> callback, Executor executor) - { - return wrappped.addCallback(callback, executor); - } - - @Override - public Future<Data> addCallback(FutureCallback<? super Data> callback) - { - return wrappped.addCallback(callback); - } - - @Override - public Future<Data> addCallback(FutureCallback<? super Data> callback, Executor executor) - { - return wrappped.addCallback(callback, executor); - } - - @Override - public Future<Data> addCallback(Consumer<? super Data> onSuccess, Consumer<? super Throwable> onFailure) - { - return wrappped.addCallback(onSuccess, onFailure); - } - - @Override - public Future<Data> addCallback(Consumer<? super Data> onSuccess, Consumer<? super Throwable> onFailure, Executor executor) - { - return wrappped.addCallback(onSuccess, onFailure, executor); - } - - @Override - public <T> Future<T> map(Function<? super Data, ? extends T> mapper) - { - return wrappped.map(mapper); - } - - @Override - public <T> Future<T> map(Function<? super Data, ? extends T> mapper, Executor executor) - { - return wrappped.map(mapper, executor); - } - - @Override - public <T> Future<T> flatMap(Function<? super Data, ? extends Future<T>> flatMapper) - { - return wrappped.flatMap(flatMapper); - } - - @Override - public <T> Future<T> flatMap(Function<? super Data, ? extends Future<T>> flatMapper, Executor executor) - { - return wrappped.flatMap(flatMapper, executor); - } - - @Override - public void addListener(Runnable runnable, Executor executor) - { - wrappped.addListener(runnable, executor); - } - - @Override - public void addListener(Runnable runnable) - { - wrappped.addListener(runnable); - } - - @Override - public Executor notifyExecutor() - { - return wrappped.notifyExecutor(); - } - - @Override - public Future<Data> addListener(GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Data>> genericFutureListener) - { - return wrappped.addListener(genericFutureListener); - } - - @Override - public Future<Data> addListeners(GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Data>>... genericFutureListeners) - { - return wrappped.addListeners(genericFutureListeners); - } - - @Override - public Future<Data> removeListener(GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Data>> genericFutureListener) - { - return wrappped.removeListener(genericFutureListener); - } - - @Override - public Future<Data> removeListeners(GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Data>>... genericFutureListeners) - { - return wrappped.removeListeners(genericFutureListeners); - } - - @Override - public boolean isSuccess() - { - return wrappped.isSuccess(); - } - - @Override - public boolean isCancellable() - { - return wrappped.isCancellable(); - } - - @Override - public Throwable cause() - { - return wrappped.cause(); - } - - @Override - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { - return wrappped.await(timeout, unit); - } - - @Override - public boolean awaitUninterruptibly(long timeout, TimeUnit unit) - { - return wrappped.awaitUninterruptibly(timeout, unit); - } - - @Override - public Data getNow() - { - return wrappped.getNow(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) - { - return wrappped.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() - { - return wrappped.isCancelled(); - } - - @Override - public boolean isDone() - { - return wrappped.isDone(); - } - - @Override - public Data get() throws InterruptedException, ExecutionException - { - return wrappped.get(); - } - - @Override - public Data get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - return wrappped.get(timeout, unit); - } - - @Override - public boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException - { - return wrappped.awaitUntil(nanoTimeDeadline); - } - - @Override - public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException - { - return wrappped.awaitUntilThrowUncheckedOnInterrupt(nanoTimeDeadline); - } - - @Override - public boolean awaitUntilUninterruptibly(long nanoTimeDeadline) - { - return wrappped.awaitUntilUninterruptibly(nanoTimeDeadline); - } - - @Override - public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException - { - return wrappped.awaitThrowUncheckedOnInterrupt(time, units); - } -} diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java index 3bbc9c2828..75c2d356a5 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java @@ -31,6 +31,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.primitives.TxnId; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.service.accord.AccordCommand; import org.apache.cassandra.service.accord.AccordCommandStore; @@ -39,9 +42,8 @@ import org.apache.cassandra.service.accord.AccordKeyspace; import org.apache.cassandra.service.accord.AccordStateCache; import org.apache.cassandra.service.accord.AccordState; import org.apache.cassandra.service.accord.api.PartitionKey; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; +import static accord.utils.async.AsyncResults.ofRunnable; public class AsyncLoader { @@ -60,7 +62,7 @@ public class AsyncLoader private final Iterable<TxnId> txnIds; private final Iterable<PartitionKey> keys; - protected Future<?> readFuture; + protected AsyncResult<?> readResult; public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys) { @@ -69,23 +71,23 @@ public class AsyncLoader this.keys = keys; } - private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key, - AccordStateCache.Instance<K, V> cache, - Map<K, V> context, - Function<V, Future<?>> readFunction, - Object callback) + private <K, V extends AccordState<K>> AsyncResult<Void> referenceAndDispatch(K key, + AccordStateCache.Instance<K, V> cache, + Map<K, V> context, + Function<V, AsyncResult<Void>> readFunction, + Object callback) { V item; - Future<?> future = cache.getLoadFuture(key); - if (future != null) + AsyncResult<Void> result = cache.getLoadResult(key); + if (result != null) { - // if a load future exists for this, it must be present in the cache + // if a load result exists for this, it must be present in the cache item = cache.getOrNull(key); Preconditions.checkState(item != null); context.put(key, item); if (logger.isTraceEnabled()) - logger.trace("Existing load future found for {} while loading for {}. ({})", item.key(), callback, item); - return future; + logger.trace("Existing load result found for {} while loading for {}. ({})", item.key(), callback, item); + return result; } item = cache.getOrCreate(key); @@ -97,40 +99,40 @@ public class AsyncLoader return null; } - future = readFunction.apply(item); - cache.setLoadFuture(item.key(), future); + result = readFunction.apply(item); + cache.setLoadResult(item.key(), result); if (logger.isTraceEnabled()) logger.trace("Loading new item for {} while loading for {}. ({})", item.key(), callback, item); - return future; + return result; } - private <K, V extends AccordState<K>> List<Future<?>> referenceAndDispatchReads(Iterable<K> keys, + private <K, V extends AccordState<K>> List<AsyncChain<Void>> referenceAndDispatchReads(Iterable<K> keys, AccordStateCache.Instance<K, V> cache, Map<K, V> context, - Function<V, Future<?>> readFunction, - List<Future<?>> futures, + Function<V, AsyncResult<Void>> readFunction, + List<AsyncChain<Void>> results, Object callback) { for (K key : keys) { - Future<?> future = referenceAndDispatch(key, cache, context, readFunction, callback); - if (future == null) + AsyncResult<Void> result = referenceAndDispatch(key, cache, context, readFunction, callback); + if (result == null) continue; - if (futures == null) - futures = new ArrayList<>(); + if (results == null) + results = new ArrayList<>(); - futures.add(future); + results.add(result); } - return futures; + return results; } @VisibleForTesting - Function<AccordCommand, Future<?>> loadCommandFunction(Object callback) + Function<AccordCommand, AsyncResult<Void>> loadCommandFunction(Object callback) { - return command -> Stage.READ.submit(() -> { + return command -> ofRunnable(Stage.READ.executor(), () -> { try { logger.trace("Starting load of {} for {}", command.txnId(), callback); @@ -146,9 +148,9 @@ public class AsyncLoader } @VisibleForTesting - Function<AccordCommandsForKey, Future<?>> loadCommandsPerKeyFunction(Object callback) + Function<AccordCommandsForKey, AsyncResult<Void>> loadCommandsPerKeyFunction(Object callback) { - return cfk -> Stage.READ.submit(() -> { + return cfk -> ofRunnable(Stage.READ.executor(), () -> { try { logger.trace("Starting load of {} for {}", cfk.key(), callback); @@ -163,25 +165,25 @@ public class AsyncLoader }); } - private Future<?> referenceAndDispatchReads(AsyncContext context, Object callback) + private AsyncResult<Void> referenceAndDispatchReads(AsyncContext context, Object callback) { - List<Future<?>> futures = null; + List<AsyncChain<Void>> results = null; - futures = referenceAndDispatchReads(txnIds, + results = referenceAndDispatchReads(txnIds, commandStore.commandCache(), context.commands.items, loadCommandFunction(callback), - futures, + results, callback); - futures = referenceAndDispatchReads(keys, + results = referenceAndDispatchReads(keys, commandStore.commandsForKeyCache(), context.commandsForKey.items, loadCommandsPerKeyFunction(callback), - futures, + results, callback); - return futures != null ? FutureCombiner.allOf(futures) : null; + return results != null ? AsyncResults.reduce(results, (a, b ) -> null).beginAsResult() : null; } @VisibleForTesting @@ -202,28 +204,28 @@ public class AsyncLoader // notify any pending write only groups we're loading a full instance so the pending changes aren't removed txnIds.forEach(commandStore.commandCache()::lockWriteOnlyGroupIfExists); keys.forEach(commandStore.commandsForKeyCache()::lockWriteOnlyGroupIfExists); - readFuture = referenceAndDispatchReads(context, callback); + readResult = referenceAndDispatchReads(context, callback); state(State.LOADING); case LOADING: - if (readFuture != null) + if (readResult != null) { - if (readFuture.isSuccess()) + if (readResult.isSuccess()) { - logger.trace("Read future succeeded for {}", callback); + logger.trace("Read result succeeded for {}", callback); context.verifyLoaded(); - readFuture = null; + readResult = null; } else { - logger.trace("Adding callback for read future: {}", callback); - readFuture.addCallback(callback, commandStore.executor()); + logger.trace("Adding callback for read result: {}", callback); + readResult.addCallback(callback, commandStore.executor()); break; } } // apply any pending write only changes that may not have made it to disk in time to be loaded - context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupLoadFuture); + context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupLoadResult); context.commands.items.values().forEach(commandStore.commandCache()::applyAndRemoveWriteOnlyGroup); - context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupLoadFuture); + context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupLoadResult); context.commandsForKey.items.values().forEach(commandStore.commandsForKeyCache()::applyAndRemoveWriteOnlyGroup); // apply blindly reported timestamps context.commandsForKey.items.values().forEach(AccordCommandsForKey::applyBlindWitnessedTimestamps); diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index e302b42489..b4b490fcdd 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -23,6 +23,7 @@ import java.util.function.Consumer; import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -32,12 +33,12 @@ import accord.local.PreLoadContext; import accord.local.SafeCommandStore; import accord.primitives.Seekables; import accord.primitives.TxnId; +import accord.utils.async.AsyncChains; import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; import org.apache.cassandra.service.accord.api.PartitionKey; -import org.apache.cassandra.utils.concurrent.AsyncPromise; -public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runnable, Function<SafeCommandStore, R>, BiConsumer<Object, Throwable> +public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements Runnable, Function<SafeCommandStore, R> { private static final Logger logger = LoggerFactory.getLogger(AsyncOperation.class); @@ -50,6 +51,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna enum State { INITIALIZED, + SUBMITTED, LOADING, RUNNING, SAVING, @@ -71,6 +73,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna private final AsyncContext context = new AsyncContext(); private R result; private final String loggingId; + private BiConsumer<? super R, Throwable> callback; private void setLoggingIds() { @@ -123,22 +126,32 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna this.state = state; } - /** - * callback for loader and writer - */ - @Override - public void accept(Object o, Throwable throwable) + private void callback(Object o, Throwable throwable) { if (throwable != null) { logger.error(String.format("Operation %s failed", this), throwable); state = State.FAILED; - tryFailure(throwable); + fail(throwable); } else run(); } + private void finish(R result) + { + Preconditions.checkArgument(state == State.COMPLETING); + callback.accept(result, null); + state = State.FINISHED; + } + + private void fail(Throwable throwable) + { + Preconditions.checkArgument(state != State.FINISHED && state != State.FAILED); + callback.accept(null, throwable); + state = State.FAILED; + } + protected void runInternal() { SafeAccordCommandStore safeStore = commandStore.safeStore(context); @@ -147,7 +160,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna case INITIALIZED: state = State.LOADING; case LOADING: - if (!loader.load(context, this)) + if (!loader.load(context, this::callback)) return; state = State.RUNNING; @@ -156,7 +169,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna state = State.SAVING; case SAVING: case AWAITING_SAVE: - boolean updatesPersisted = writer.save(context, this); + boolean updatesPersisted = writer.save(context, this::callback); if (state != State.AWAITING_SAVE) { @@ -170,8 +183,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna return; state = State.COMPLETING; - setSuccess(result); - state = State.FINISHED; + finish(result); case FINISHED: break; default: @@ -196,7 +208,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna catch (Throwable t) { logger.error(String.format("Operation %s failed", this), t); - tryFailure(t); + fail(t); } finally { @@ -210,6 +222,14 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna } } + @Override + public void begin(BiConsumer<? super R, Throwable> callback) + { + Preconditions.checkArgument(this.callback == null); + this.callback = callback; + commandStore.executor().submit(this); + } + private static Iterable<PartitionKey> toPartitionKeys(Seekables<?, ?> keys) { switch (keys.domain()) diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java index c920a0f7bb..fb7f38687c 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java @@ -35,6 +35,9 @@ import org.slf4j.LoggerFactory; import accord.primitives.Seekable; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.service.accord.AccordCommand; @@ -46,8 +49,8 @@ import org.apache.cassandra.service.accord.AccordStateCache; import org.apache.cassandra.service.accord.AccordState; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.store.StoredSet; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; + +import static accord.utils.async.AsyncResults.ofRunnable; import static accord.primitives.Routable.Domain.Range; @@ -64,7 +67,7 @@ public class AsyncWriter } private State state = State.INITIALIZED; - protected Future<?> writeFuture; + protected AsyncResult<Void> writeResult; private final AccordCommandStore commandStore; final AccordStateCache.Instance<TxnId, AccordCommand> commandCache; final AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCache; @@ -81,12 +84,12 @@ public class AsyncWriter Mutation apply(AccordCommandStore commandStore, V state, long timestamp); } - private static <K, V extends AccordState<K>> List<Future<?>> dispatchWrites(AsyncContext.Group<K, V> ctxGroup, + private static <K, V extends AccordState<K>> List<AsyncChain<Void>> dispatchWrites(AsyncContext.Group<K, V> ctxGroup, AccordStateCache.Instance<K, V> cache, StateMutationFunction<K, V> mutationFunction, long timestamp, AccordCommandStore commandStore, - List<Future<?>> futures, + List<AsyncChain<Void>> results, Object callback) { for (V item : ctxGroup.items.values()) @@ -98,13 +101,13 @@ public class AsyncWriter continue; } - if (futures == null) - futures = new ArrayList<>(); + if (results == null) + results = new ArrayList<>(); K key = item.key(); Mutation mutation = mutationFunction.apply(commandStore, item, timestamp); if (logger.isTraceEnabled()) logger.trace("Dispatching mutation for {} for {}, {} -> {}", key, callback, item, mutation); - Future<?> future = Stage.MUTATION.submit(() -> { + AsyncResult<Void> result = ofRunnable(Stage.MUTATION.executor(), () -> { try { if (logger.isTraceEnabled()) @@ -119,46 +122,46 @@ public class AsyncWriter throw t; } }); - cache.addSaveFuture(item.key(), future); - futures.add(future); + cache.addSaveResult(item.key(), result); + results.add(result); } for (AccordState.WriteOnly<K, V> item : ctxGroup.writeOnly.values()) { Preconditions.checkState(item.hasModifications()); - if (futures == null) futures = new ArrayList<>(); + if (results == null) results = new ArrayList<>(); Mutation mutation = mutationFunction.apply(commandStore, (V) item, timestamp); - Future<?> future = Stage.MUTATION.submit((Runnable) mutation::apply); - future.addListener(() -> cache.purgeWriteOnly(item.key()), commandStore.executor()); - item.future(future); - futures.add(future); + AsyncResult<Void> result = AsyncResults.ofRunnable(Stage.MUTATION.executor(), mutation::apply); + result.addListener(() -> cache.purgeWriteOnly(item.key()), commandStore.executor()); + item.asyncResult(result); + results.add(result); } - return futures; + return results; } - private Future<?> maybeDispatchWrites(AsyncContext context, Object callback) throws IOException + private AsyncResult<Void> maybeDispatchWrites(AsyncContext context, Object callback) throws IOException { - List<Future<?>> futures = null; + List<AsyncChain<Void>> results = null; long timestamp = commandStore.nextSystemTimestampMicros(); - futures = dispatchWrites(context.commands, + results = dispatchWrites(context.commands, commandStore.commandCache(), AccordKeyspace::getCommandMutation, timestamp, commandStore, - futures, + results, callback); - futures = dispatchWrites(context.commandsForKey, + results = dispatchWrites(context.commandsForKey, commandStore.commandsForKeyCache(), AccordKeyspace::getCommandsForKeyMutation, timestamp, commandStore, - futures, + results, callback); - return futures != null ? FutureCombiner.allOf(futures) : null; + return results != null ? AsyncResults.reduce(results, (a, b) -> null).beginAsResult() : null; } private void denormalizeBlockedOn(AccordCommand command, @@ -201,7 +204,7 @@ public class AsyncWriter return item; item = cache.getOrNull(key); - if (item != null && !cache.hasLoadFuture(key)) + if (item != null && !cache.hasLoadResult(key)) { ctxGroup.items.put(key, item); return item; @@ -303,18 +306,18 @@ public class AsyncWriter setState(State.SETUP); case SETUP: denormalize(context, callback); - writeFuture = maybeDispatchWrites(context, callback); + writeResult = maybeDispatchWrites(context, callback); setState(State.SAVING); case SAVING: - if (writeFuture != null && !writeFuture.isSuccess()) + if (writeResult != null && !writeResult.isSuccess()) { - logger.trace("Adding callback for write future: {}", callback); - writeFuture.addCallback(callback, commandStore.executor()); + logger.trace("Adding callback for write result: {}", callback); + writeResult.addCallback(callback, commandStore.executor()); break; } - context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupSaveFuture); - context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupSaveFuture); + context.commands.items.keySet().forEach(commandStore.commandCache()::cleanupSaveResult); + context.commandsForKey.items.keySet().forEach(commandStore.commandsForKeyCache()::cleanupSaveResult); setState(State.FINISHED); case FINISHED: break; diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java index 534f4aa262..acab7c89f7 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import accord.api.Data; import accord.local.SafeCommandStore; import accord.primitives.Timestamp; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; @@ -40,7 +42,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.Future; import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength; import static org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength; @@ -110,7 +111,7 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> return key; } - public Future<Data> read(boolean isForWriteTxn, SafeCommandStore safeStore, Timestamp executeAt) + public AsyncChain<Data> read(boolean isForWriteTxn, SafeCommandStore safeStore, Timestamp executeAt) { SinglePartitionReadCommand command = (SinglePartitionReadCommand) get(); // TODO (required, safety): before release, double check reasoning that this is safe @@ -121,7 +122,7 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> // immediately after the transaction executed, and this simplifies things a great deal int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc()); - return Stage.READ.submit(() -> + return AsyncChains.ofCallable(Stage.READ.executor(), () -> { SinglePartitionReadCommand read = command.withNowInSec(nowInSeconds); diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java index eeb57f13f5..5b5812aed8 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.function.BiConsumer; import com.google.common.collect.ImmutableList; @@ -36,21 +35,18 @@ import accord.primitives.Seekable; import accord.primitives.Timestamp; import accord.primitives.Txn; import org.apache.cassandra.db.SinglePartitionReadCommand; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.serializers.KeySerializers; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.Simulate; -import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.ImmediateFuture; import static org.apache.cassandra.utils.ArraySerializers.deserializeArray; import static org.apache.cassandra.utils.ArraySerializers.serializeArray; import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize; -import static org.apache.cassandra.utils.Simulate.With.MONITORS; public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read { @@ -143,51 +139,18 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read } @Override - public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore safeStore, Timestamp executeAt, DataStore store) + public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore safeStore, Timestamp executeAt, DataStore store) { - List<Future<Data>> futures = new ArrayList<>(); - forEachWithKey((PartitionKey) key, read -> futures.add(read.read(kind.isWrite(), safeStore, executeAt))); + List<AsyncChain<Data>> results = new ArrayList<>(); + forEachWithKey((PartitionKey) key, read -> results.add(read.read(kind.isWrite(), safeStore, executeAt))); - if (futures.isEmpty()) - return ImmediateFuture.success(new TxnData()); + if (results.isEmpty()) + return AsyncChains.success(new TxnData()); - if (futures.size() == 1) - return futures.get(0); + if (results.size() == 1) + return results.get(0); - return new MultiReadFuture(futures); - } - - @Simulate(with = MONITORS) - private static class MultiReadFuture extends AsyncPromise<Data> implements BiConsumer<Data, Throwable> - { - private Data result = null; - private int pending; - - public MultiReadFuture(List<Future<Data>> futures) - { - pending = futures.size(); - listen(futures); - } - - private synchronized void listen(List<Future<Data>> futures) - { - for (int i=0, mi=futures.size(); i<mi; i++) - futures.get(i).addCallback(this); - } - - @Override - public synchronized void accept(Data data, Throwable throwable) - { - if (isDone()) - return; - - if (throwable != null) - tryFailure(throwable); - - result = result != null ? result.merge(data) : data; - if (--pending == 0) - trySuccess(result); - } + return AsyncChains.reduce(results, Data::merge); } public static final IVersionedSerializer<TxnRead> serializer = new IVersionedSerializer<TxnRead>() diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java index 94593ffc52..d6f3a71c8b 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java @@ -37,6 +37,8 @@ import accord.local.SafeCommandStore; import accord.primitives.Seekable; import accord.primitives.Timestamp; import accord.primitives.Writes; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.cql3.UpdateParameters; import org.apache.cassandra.db.Clustering; @@ -56,7 +58,6 @@ import org.apache.cassandra.service.accord.AccordCommandsForKey; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.*; import static org.apache.cassandra.utils.ArraySerializers.deserializeArray; import static org.apache.cassandra.service.accord.AccordSerializers.partitionUpdateSerializer; @@ -122,11 +123,11 @@ public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements Writ '}'; } - public Future<?> write(long timestamp, int nowInSeconds) + public AsyncChain<Void> write(long timestamp, int nowInSeconds) { PartitionUpdate update = new PartitionUpdate.Builder(get(), 0).updateAllTimestampAndLocalDeletionTime(timestamp, nowInSeconds).build(); Mutation mutation = new Mutation(update); - return Stage.MUTATION.submit((Runnable) mutation::apply); + return AsyncChains.ofRunnable(Stage.MUTATION.executor(), mutation::apply); } @Override @@ -342,7 +343,7 @@ public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements Writ } @Override - public Future<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store) + public AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store) { AccordCommandsForKey cfk = ((SafeAccordCommandStore) safeStore).commandsForKey((Key)key); // TODO (expected, efficiency): 99.9999% of the time we can just use executeAt.hlc(), so can avoid bringing @@ -352,16 +353,16 @@ public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements Writ // TODO (low priority - do we need to compute nowInSeconds, or can we just use executeAt?) int nowInSeconds = cfk.nowInSecondsFor(executeAt, true); - List<Future<?>> futures = new ArrayList<>(); - forEachWithKey((PartitionKey) key, write -> futures.add(write.write(timestamp, nowInSeconds))); + List<AsyncChain<Void>> results = new ArrayList<>(); + forEachWithKey((PartitionKey) key, write -> results.add(write.write(timestamp, nowInSeconds))); - if (futures.isEmpty()) + if (results.isEmpty()) return Writes.SUCCESS; - if (futures.size() == 1) - return futures.get(0).flatMap(o -> Writes.SUCCESS); + if (results.size() == 1) + return results.get(0).flatMap(o -> Writes.SUCCESS); - return FutureCombiner.allOf(futures).flatMap(objects -> Writes.SUCCESS); + return AsyncChains.all(results).flatMap(objects -> Writes.SUCCESS); } public long estimatedSizeOnHeap() diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index 6d89246593..02ccdc44ae 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.accord; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import org.junit.Assert; @@ -52,6 +51,7 @@ import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandS import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.ByteBufferUtil; +import static accord.utils.async.AsyncChains.awaitUninterruptibly; import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; import static org.apache.cassandra.service.accord.AccordTestUtils.*; @@ -82,10 +82,10 @@ public class AccordCommandTest * disable cache and make sure correct values are coming in and out of the accord table */ @Test - public void basicCycleTest() throws ExecutionException, InterruptedException + public void basicCycleTest() { AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get(); + awaitUninterruptibly(commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); })); TxnId txnId = txnId(1, clock.incrementAndGet(), 1); @@ -98,15 +98,15 @@ public class AccordCommandTest PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 1, 1, false, 1, partialTxn, fullRoute); // Check preaccept - commandStore.execute(preAccept, instance -> { + awaitUninterruptibly(commandStore.execute(preAccept, instance -> { PreAccept.PreAcceptReply reply = preAccept.apply(instance); Assert.assertTrue(reply.isOk()); PreAccept.PreAcceptOk ok = (PreAccept.PreAcceptOk) reply; Assert.assertEquals(txnId, ok.witnessedAt); Assert.assertTrue(ok.deps.isEmpty()); - }).get(); + })); - commandStore.execute(preAccept, instance -> { + awaitUninterruptibly(commandStore.execute(preAccept, instance -> { Command command = instance.command(txnId); Assert.assertEquals(txnId, command.executeAt()); Assert.assertEquals(Status.PreAccepted, command.status()); @@ -116,7 +116,7 @@ public class AccordCommandTest Assert.assertEquals(txnId, cfk.max()); Assert.assertNotNull((cfk.byId()).get(txnId)); Assert.assertNotNull((cfk.byExecuteAt()).get(txnId)); - }).get(); + })); // check accept TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); @@ -129,13 +129,13 @@ public class AccordCommandTest } Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, false, Ballot.ZERO, executeAt, partialTxn.keys(), deps); - commandStore.execute(accept, instance -> { + awaitUninterruptibly(commandStore.execute(accept, instance -> { Accept.AcceptReply reply = accept.apply(instance); Assert.assertTrue(reply.isOk()); Assert.assertTrue(reply.deps.isEmpty()); - }).get(); + })); - commandStore.execute(accept, instance -> { + awaitUninterruptibly(commandStore.execute(accept, instance -> { Command command = instance.command(txnId); Assert.assertEquals(executeAt, command.executeAt()); Assert.assertEquals(Status.Accepted, command.status()); @@ -145,13 +145,13 @@ public class AccordCommandTest Assert.assertEquals(executeAt, cfk.max()); Assert.assertNotNull((cfk.byId()).get(txnId)); Assert.assertNotNull((cfk.byExecuteAt()).get(txnId)); - }).get(); + })); // check commit Commit commit = Commit.SerializerSupport.create(txnId, route, 1, executeAt, partialTxn, deps, fullRoute, null); - commandStore.execute(commit, commit::apply).get(); + awaitUninterruptibly(commandStore.execute(commit, commit::apply)); - commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key)), instance -> { + awaitUninterruptibly(commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key)), instance -> { Command command = instance.command(txnId); Assert.assertEquals(commit.executeAt, command.executeAt()); Assert.assertTrue(command.hasBeen(Status.Committed)); @@ -160,14 +160,14 @@ public class AccordCommandTest AccordCommandsForKey cfk = ((SafeAccordCommandStore)instance).commandsForKey(key(1)); Assert.assertNotNull((cfk.byId()).get(txnId)); Assert.assertNotNull((cfk.byExecuteAt()).get(commit.executeAt)); - }).get(); + })); } @Test public void computeDeps() throws Throwable { AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get(); + awaitUninterruptibly(commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); })); TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1); Txn txn = createTxn(2); @@ -178,16 +178,16 @@ public class AccordCommandTest PartialTxn partialTxn = txn.slice(route.covering(), true); PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, route, 1, 1, false, 1, partialTxn, fullRoute); - commandStore.execute(preAccept1, preAccept1::apply).get(); + awaitUninterruptibly(commandStore.execute(preAccept1, preAccept1::apply)); // second preaccept should identify txnId1 as a dependency TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); PreAccept preAccept2 = PreAccept.SerializerSupport.create(txnId2, route, 1, 1, false, 1, partialTxn, fullRoute); - commandStore.execute(preAccept2, instance -> { + awaitUninterruptibly(commandStore.execute(preAccept2, instance -> { PreAccept.PreAcceptReply reply = preAccept2.apply(instance); Assert.assertTrue(reply.isOk()); PreAccept.PreAcceptOk ok = (PreAccept.PreAcceptOk) reply; Assert.assertTrue(ok.deps.contains(txnId1)); - }).get(); + })); } } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java index 2cc2b3de18..a686afad7c 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordStateCacheTest.java @@ -21,12 +21,13 @@ package org.apache.cassandra.service.accord; import java.util.HashSet; import java.util.Set; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; -import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; public class AccordStateCacheTest { @@ -248,8 +249,8 @@ public class AccordStateCacheTest assertCacheState(cache, 0, 4, DEFAULT_NODE_SIZE * 4); - AsyncPromise<Void> saveFuture = new AsyncPromise<>(); - instance.addSaveFuture(0, saveFuture); + AsyncResult<Void> saveFuture = AsyncResults.settable(); + instance.addSaveResult(0, saveFuture); cache.setMaxSize(0); // all should have been evicted except 0 @@ -269,7 +270,7 @@ public class AccordStateCacheTest static class WriteOnly extends SetItem implements AccordState.WriteOnly<Integer, SetItem> { - AsyncPromise<?> promise = null; + AsyncResult.Settable<Void> promise = null; final Set<Integer> added = new HashSet<>(); final Set<Integer> remove = new HashSet<>(); @@ -279,14 +280,14 @@ public class AccordStateCacheTest } @Override - public void future(Future<?> future) + public void asyncResult(AsyncResult<Void> notifier) { - Assert.assertTrue(future instanceof AsyncPromise); - this.promise = (AsyncPromise<?>) future; + Preconditions.checkArgument(notifier instanceof AsyncResult.Settable); + this.promise = (AsyncResult.Settable<Void>) notifier; } @Override - public Future<?> future() + public AsyncResult<Void> asyncResult() { return promise; } @@ -353,17 +354,17 @@ public class AccordStateCacheTest SetItem.WriteOnly writeOnly1 = new SetItem.WriteOnly(5); writeOnly1.added.addAll(ImmutableSet.of(4, 5)); - writeOnly1.future(new AsyncPromise<>()); + writeOnly1.asyncResult(AsyncResults.settable()); instance.addWriteOnly(writeOnly1); Assert.assertEquals(1, instance.pendingWriteOnlyOperations(5)); SetItem.WriteOnly writeOnly2 = new SetItem.WriteOnly(5); writeOnly2.remove.addAll(ImmutableSet.of(2, 4)); - writeOnly2.future(new AsyncPromise<>()); + writeOnly2.asyncResult(AsyncResults.settable()); instance.addWriteOnly(writeOnly2); Assert.assertEquals(2, instance.pendingWriteOnlyOperations(5)); - Assert.assertNull(instance.getSaveFuture(5)); + Assert.assertNull(instance.getSaveResult(5)); Assert.assertFalse(instance.writeOnlyGroupIsLocked(5)); instance.lockWriteOnlyGroupIfExists(5); @@ -377,7 +378,7 @@ public class AccordStateCacheTest // write only futures should have been merged and promoted to normal save futures, which would // prevent the cached object from being purged until they were completed - Future<?> saveFuture = instance.getSaveFuture(5); + AsyncResult<?> saveFuture = instance.getSaveResult(5); Assert.assertNotNull(saveFuture); Assert.assertFalse(saveFuture.isDone()); Assert.assertFalse(instance.canEvict(5)); @@ -402,7 +403,7 @@ public class AccordStateCacheTest { SetItem.WriteOnly item = new SetItem.WriteOnly(5); item.added.add(i); - item.future(new AsyncPromise<>()); + item.asyncResult(AsyncResults.settable()); instance.addWriteOnly(item); writeOnly[i] = item; } @@ -433,7 +434,7 @@ public class AccordStateCacheTest SetItem.WriteOnly item = new SetItem.WriteOnly(5); item.added.add(0); - item.future(new AsyncPromise<>()); + item.asyncResult(AsyncResults.settable()); instance.addWriteOnly(item); instance.lockWriteOnlyGroupIfExists(5); @@ -452,8 +453,8 @@ public class AccordStateCacheTest AccordStateCache cache = new AccordStateCache(500); AccordStateCache.Instance<Integer, SetItem> instance = cache.instance(Integer.class, SetItem.class, SetItem::new); - AsyncPromise<?> loadfuture = new AsyncPromise<>(); - instance.setLoadFuture(5, loadfuture); + AsyncResult<Void> loadfuture = AsyncResults.settable(); + instance.setLoadResult(5, loadfuture); Assert.assertFalse(instance.writeOnlyGroupIsLocked(5)); Assert.assertEquals(0, instance.pendingWriteOnlyOperations(5)); @@ -461,7 +462,7 @@ public class AccordStateCacheTest // adding a write only object should immediately lock the group, since there's an existing load future SetItem.WriteOnly item = new SetItem.WriteOnly(5); item.added.add(0); - item.future(new AsyncPromise<>()); + item.asyncResult(AsyncResults.settable()); instance.addWriteOnly(item); Assert.assertTrue(instance.writeOnlyGroupIsLocked(5)); @@ -474,12 +475,12 @@ public class AccordStateCacheTest { AccordStateCache cache = new AccordStateCache(500); AccordStateCache.Instance<Integer, SetItem> instance = cache.instance(Integer.class, SetItem.class, SetItem::new); - AsyncPromise<?> promise1 = new AsyncPromise<>(); - AsyncPromise<?> promise2 = new AsyncPromise<>(); - instance.addSaveFuture(5, promise1); - instance.addSaveFuture(5, promise2); + AsyncResult.Settable<Void> promise1 = AsyncResults.settable(); + AsyncResult.Settable<Void> promise2 = AsyncResults.settable(); + instance.addSaveResult(5, promise1); + instance.addSaveResult(5, promise2); - Future<?> future = instance.getSaveFuture(5); + AsyncResult<?> future = instance.getSaveResult(5); Assert.assertNotSame(future, promise1); Assert.assertNotSame(future, promise2); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 531b513fa6..0c457ae067 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -54,6 +54,7 @@ import accord.primitives.Unseekables; import accord.primitives.Writes; import accord.topology.Shard; import accord.topology.Topology; +import accord.utils.async.AsyncChains; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.TransactionStatement; @@ -68,6 +69,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static accord.primitives.Routable.Domain.Key; +import static accord.utils.async.AsyncChains.awaitUninterruptibly; import static java.lang.String.format; public class AccordTestUtils @@ -113,7 +115,7 @@ public class AccordTestUtils public static void processCommandResult(AccordCommandStore commandStore, Command command) throws Throwable { - commandStore.execute(PreLoadContext.contextFor(Collections.emptyList(), command.partialTxn().keys()), + awaitUninterruptibly(commandStore.execute(PreLoadContext.contextFor(Collections.emptyList(), command.partialTxn().keys()), instance -> { PartialTxn txn = command.partialTxn(); TxnRead read = (TxnRead) txn.read(); @@ -121,7 +123,7 @@ public class AccordTestUtils .map(key -> { try { - return read.read(key, command.txnId().rw(), instance, command.executeAt(), null).get(); + return AsyncChains.getBlocking(read.read(key, command.txnId().rw(), instance, command.executeAt(), null)); } catch (InterruptedException e) { @@ -136,7 +138,7 @@ public class AccordTestUtils Write write = txn.update().apply(readData); ((AccordCommand)command).setWrites(new Writes(command.executeAt(), (Keys)txn.keys(), write)); ((AccordCommand)command).setResult(txn.query().compute(command.txnId(), readData, txn.read(), txn.update())); - }).get(); + })); } public static Txn createTxn(String query) diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java index c5bc97984d..3cdfceda18 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java @@ -33,6 +33,8 @@ import org.junit.Test; import accord.local.Status; import accord.primitives.PartialTxn; import accord.primitives.TxnId; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; @@ -43,7 +45,6 @@ import org.apache.cassandra.service.accord.AccordKeyspace; import org.apache.cassandra.service.accord.AccordStateCache; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.Collections.singleton; @@ -207,8 +208,8 @@ public class AsyncLoaderTest AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key)); // since there's a read future associated with the txnId, we'll wait for it to load - AsyncPromise<Void> readFuture = new AsyncPromise<>(); - commandCache.setLoadFuture(command.txnId(), readFuture); + AsyncResult.Settable<Void> readFuture = AsyncResults.settable(); + commandCache.setLoadResult(command.txnId(), readFuture); AsyncPromise<Void> cbFired = new AsyncPromise<>(); commandStore.executeBlocking(() -> { @@ -254,12 +255,12 @@ public class AsyncLoaderTest AccordStateCache.Instance<TxnId, AccordCommand> cache = commandStore.commandCache(); AccordCommand.WriteOnly writeOnly1 = new AccordCommand.WriteOnly(txnId); writeOnly1.blockingApplyOn.blindAdd(blockApply); - writeOnly1.future(new AsyncPromise<>()); + writeOnly1.asyncResult(AsyncResults.settable()); cache.addWriteOnly(writeOnly1); AccordCommand.WriteOnly writeOnly2 = new AccordCommand.WriteOnly(txnId); writeOnly2.blockingCommitOn.blindAdd(blockCommit); - writeOnly2.future(new AsyncPromise<>()); + writeOnly2.asyncResult(AsyncResults.settable()); cache.addWriteOnly(writeOnly2); AsyncContext context = new AsyncContext(); @@ -286,9 +287,9 @@ public class AsyncLoaderTest TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1); TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); - AsyncPromise<Void> promise1 = new AsyncPromise<>(); - AsyncPromise<Void> promise2 = new AsyncPromise<>(); - AsyncPromise<Void> callback = new AsyncPromise<>(); + AsyncResult.Settable<Void> promise1 = AsyncResults.settable(); + AsyncResult.Settable<Void> promise2 = AsyncResults.settable(); + AsyncResult.Settable<Void> callback = AsyncResults.settable(); RuntimeException failure = new RuntimeException(); execute(commandStore, () -> { @@ -296,7 +297,7 @@ public class AsyncLoaderTest AtomicInteger loadCalls = new AtomicInteger(); AsyncLoader loader = new AsyncLoader(commandStore, ImmutableList.of(txnId1, txnId2), Collections.emptyList()){ @Override - Function<AccordCommand, Future<?>> loadCommandFunction(Object callback) + Function<AccordCommand, AsyncResult<Void>> loadCommandFunction(Object callback) { return cmd -> { TxnId txnId = cmd.txnId(); @@ -321,6 +322,6 @@ public class AsyncLoaderTest }); promise1.tryFailure(failure); - callback.get(); + AsyncResults.awaitUninterruptibly(callback); } } diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java index 2a53d08ae3..2f3011128a 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Futures; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -54,6 +53,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.FBUtilities; import static accord.local.PreLoadContext.contextFor; +import static accord.utils.async.AsyncChains.awaitUninterruptibly; import static java.util.Collections.emptyList; import static java.util.Collections.singleton; import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; @@ -94,10 +94,10 @@ public class AsyncOperationTest Txn txn = createTxn((int)clock.incrementAndGet()); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); - commandStore.execute(contextFor(txnId), instance -> { + awaitUninterruptibly(commandStore.execute(contextFor(txnId), instance -> { Command command = instance.ifPresent(txnId); Assert.assertNull(command); - }).get(); + })); UntypedResultSet result = AccordKeyspace.loadCommandRow(commandStore, txnId); Assert.assertTrue(result.isEmpty()); @@ -110,10 +110,10 @@ public class AsyncOperationTest Txn txn = createTxn((int)clock.incrementAndGet()); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); - commandStore.execute(contextFor(Collections.emptyList(), Keys.of(key)),instance -> { + awaitUninterruptibly(commandStore.execute(contextFor(Collections.emptyList(), Keys.of(key)),instance -> { AccordCommandsForKey cfk = ((SafeAccordCommandStore)instance).maybeCommandsForKey(key); Assert.assertNull(cfk); - }).get(); + })); int nowInSeconds = FBUtilities.nowInSeconds(); SinglePartitionReadCommand command = AccordKeyspace.getCommandsForKeyRead(commandStore, key, nowInSeconds); @@ -142,10 +142,10 @@ public class AsyncOperationTest private static void assertFutureState(AccordStateCache.Instance<TxnId, AccordCommand> cache, TxnId txnId, boolean expectLoadFuture, boolean expectSaveFuture) { - if (cache.hasLoadFuture(txnId) != expectLoadFuture) + if (cache.hasLoadResult(txnId) != expectLoadFuture) throw new AssertionError(expectLoadFuture ? "Load future unexpectedly not found for " + txnId : "Unexpectedly found load future for " + txnId); - if (cache.hasSaveFuture(txnId) != expectSaveFuture) + if (cache.hasSaveResult(txnId) != expectSaveFuture) throw new AssertionError(expectSaveFuture ? "Save future unexpectedly not found for " + txnId : "Unexpectedly found save future for " + txnId); @@ -220,6 +220,6 @@ public class AsyncOperationTest commandStore.executor().submit(operation); - Futures.getUnchecked(operation); + awaitUninterruptibly(operation); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org