This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 778c45cd Baseline Diagnostic vtables for Accord
778c45cd is described below

commit 778c45cd977576a901abf24a9759872d36fde056
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue May 14 17:55:58 2024 -0500

    Baseline Diagnostic vtables for Accord
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Ariel Weisberg for 
CASSANDRA-18732
---
 .../src/main/java/accord/local/CommandStores.java       | 17 +++++++++++++++++
 accord-core/src/main/java/accord/local/Node.java        | 16 +++++++++++-----
 .../src/main/java/accord/utils/async/AsyncChains.java   | 13 ++++++++++---
 3 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 87ff2582..20f08a8e 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -590,6 +591,22 @@ public abstract class CommandStores
         return chain == null ? AsyncChains.success(null) : chain;
     }
 
+    public <O> AsyncChain<List<O>> map(Function<? super SafeCommandStore, O> 
mapper)
+    {
+        return map(PreLoadContext.empty(), mapper);
+    }
+
+    public <O> AsyncChain<List<O>> map(PreLoadContext context, Function<? 
super SafeCommandStore, O> mapper)
+    {
+        ShardHolder[] shards = current.shards;
+        List<AsyncChain<O>> results = new ArrayList<>(shards.length);
+
+        for (ShardHolder shard : shards)
+            results.add(shard.store.submit(context, mapper));
+
+        return AsyncChains.all(results);
+    }
+
     protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream 
commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce)
     {
         // TODO (low priority, efficiency): avoid using an array, or use a 
scratch buffer
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 1605f435..ced53157 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -37,12 +37,8 @@ import java.util.function.ToLongFunction;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.coordinate.CoordinateEphemeralRead;
-import accord.coordinate.CoordinationAdapter;
-import accord.coordinate.CoordinationAdapter.Factory.Step;
-import accord.utils.DeterministicSet;
-import accord.utils.Invariants;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +54,10 @@ import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TopologySorter;
 import accord.config.LocalConfig;
+import accord.coordinate.CoordinateEphemeralRead;
 import accord.coordinate.CoordinateTransaction;
+import accord.coordinate.CoordinationAdapter;
+import accord.coordinate.CoordinationAdapter.Factory.Step;
 import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
 import accord.coordinate.RecoverWithRoute;
@@ -85,6 +84,8 @@ import accord.primitives.Unseekables;
 import accord.topology.Shard;
 import accord.topology.Topology;
 import accord.topology.TopologyManager;
+import accord.utils.DeterministicSet;
+import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
@@ -196,6 +197,11 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         return localConfig;
     }
 
+    public Map<TxnId, AsyncResult<? extends Outcome>> coordinating()
+    {
+        return ImmutableMap.copyOf(coordinating);
+    }
+
     /**
      * This starts the node for tests and makes sure that the provided 
topology is acknowledged correctly.  This method is not
      * safe for production systems as it doesn't handle restarts and partially 
acknowledged histories
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index a18f7828..5e488fe7 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -589,14 +589,21 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
 
     public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V> 
callable)
     {
-        return new Head<V>()
+        return ofCallable(executor, callable, AsyncChains::encapsulate);
+    }
+
+    public static <V> AsyncChain<V> ofCallable(Executor executor,
+                                               Callable<V> callable,
+                                               BiFunction<Callable<V>, 
BiConsumer<? super V, Throwable>, Runnable> encapsulator)
+    {
+        return new Head<>()
         {
             @Override
             protected void start(BiConsumer<? super V, Throwable> callback)
             {
                 try
                 {
-                    executor.execute(encapsulate(callable, callback));
+                    executor.execute(encapsulator.apply(callable, callback));
                 }
                 catch (Throwable t)
                 {
@@ -615,7 +622,7 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
             {
                 try
                 {
-                    executor.execute(AsyncChains.encapsulate(runnable, 
callback));
+                    executor.execute(encapsulate(runnable, callback));
                 }
                 catch (Throwable t)
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to