This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 778c45cd Baseline Diagnostic vtables for Accord
778c45cd is described below
commit 778c45cd977576a901abf24a9759872d36fde056
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue May 14 17:55:58 2024 -0500
Baseline Diagnostic vtables for Accord
patch by Caleb Rackliffe; reviewed by David Capwell and Ariel Weisberg for
CASSANDRA-18732
---
.../src/main/java/accord/local/CommandStores.java | 17 +++++++++++++++++
accord-core/src/main/java/accord/local/Node.java | 16 +++++++++++-----
.../src/main/java/accord/utils/async/AsyncChains.java | 13 ++++++++++---
3 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index 87ff2582..20f08a8e 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -590,6 +591,22 @@ public abstract class CommandStores
return chain == null ? AsyncChains.success(null) : chain;
}
+ public <O> AsyncChain<List<O>> map(Function<? super SafeCommandStore, O>
mapper)
+ {
+ return map(PreLoadContext.empty(), mapper);
+ }
+
+ public <O> AsyncChain<List<O>> map(PreLoadContext context, Function<?
super SafeCommandStore, O> mapper)
+ {
+ ShardHolder[] shards = current.shards;
+ List<AsyncChain<O>> results = new ArrayList<>(shards.length);
+
+ for (ShardHolder shard : shards)
+ results.add(shard.store.submit(context, mapper));
+
+ return AsyncChains.all(results);
+ }
+
protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream
commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce)
{
// TODO (low priority, efficiency): avoid using an array, or use a
scratch buffer
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 1605f435..ced53157 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -37,12 +37,8 @@ import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import accord.coordinate.CoordinateEphemeralRead;
-import accord.coordinate.CoordinationAdapter;
-import accord.coordinate.CoordinationAdapter.Factory.Step;
-import accord.utils.DeterministicSet;
-import accord.utils.Invariants;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +54,10 @@ import accord.api.RoutingKey;
import accord.api.Scheduler;
import accord.api.TopologySorter;
import accord.config.LocalConfig;
+import accord.coordinate.CoordinateEphemeralRead;
import accord.coordinate.CoordinateTransaction;
+import accord.coordinate.CoordinationAdapter;
+import accord.coordinate.CoordinationAdapter.Factory.Step;
import accord.coordinate.MaybeRecover;
import accord.coordinate.Outcome;
import accord.coordinate.RecoverWithRoute;
@@ -85,6 +84,8 @@ import accord.primitives.Unseekables;
import accord.topology.Shard;
import accord.topology.Topology;
import accord.topology.TopologyManager;
+import accord.utils.DeterministicSet;
+import accord.utils.Invariants;
import accord.utils.MapReduceConsume;
import accord.utils.RandomSource;
import accord.utils.async.AsyncChain;
@@ -196,6 +197,11 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
return localConfig;
}
+ public Map<TxnId, AsyncResult<? extends Outcome>> coordinating()
+ {
+ return ImmutableMap.copyOf(coordinating);
+ }
+
/**
* This starts the node for tests and makes sure that the provided
topology is acknowledged correctly. This method is not
* safe for production systems as it doesn't handle restarts and partially
acknowledged histories
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index a18f7828..5e488fe7 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -589,14 +589,21 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V>
callable)
{
- return new Head<V>()
+ return ofCallable(executor, callable, AsyncChains::encapsulate);
+ }
+
+ public static <V> AsyncChain<V> ofCallable(Executor executor,
+ Callable<V> callable,
+ BiFunction<Callable<V>,
BiConsumer<? super V, Throwable>, Runnable> encapsulator)
+ {
+ return new Head<>()
{
@Override
protected void start(BiConsumer<? super V, Throwable> callback)
{
try
{
- executor.execute(encapsulate(callable, callback));
+ executor.execute(encapsulator.apply(callable, callback));
}
catch (Throwable t)
{
@@ -615,7 +622,7 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
{
try
{
- executor.execute(AsyncChains.encapsulate(runnable,
callback));
+ executor.execute(encapsulate(runnable, callback));
}
catch (Throwable t)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]