This is an automated email from the ASF dual-hosted git repository.

jlewandowski pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0419858b CEP-15: Accord metrics
0419858b is described below

commit 0419858bd1f6761f08fd1369477f7c142f5bbb4f
Author: Jacek Lewandowski <[email protected]>
AuthorDate: Thu Aug 3 16:39:31 2023 +0200

    CEP-15: Accord metrics
    
    Patch by Jacek Lewandowski, reviewed by Caleb Rackliffe and Henrik Ingo for 
CASSANDRA-18580
---
 accord-core/src/main/java/accord/api/Agent.java    |  5 ++
 .../src/main/java/accord/api/EventsListener.java   | 71 ++++++++++++++++++++++
 .../accord/coordinate/CoordinatePreAccept.java     | 16 +++++
 .../accord/coordinate/CoordinateTransaction.java   |  3 +
 .../src/main/java/accord/coordinate/Recover.java   | 24 ++++++--
 .../java/accord/coordinate/RecoverWithRoute.java   |  4 +-
 .../main/java/accord/impl/SimpleProgressLog.java   |  5 +-
 .../src/main/java/accord/local/CommandStores.java  |  7 +++
 .../src/main/java/accord/local/Commands.java       |  6 ++
 .../src/main/java/accord/primitives/Timestamp.java |  4 +-
 10 files changed, 135 insertions(+), 10 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Agent.java 
b/accord-core/src/main/java/accord/api/Agent.java
index 717ae87a..ff8ab047 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -60,4 +60,9 @@ public interface Agent extends UncaughtExceptionListener
     boolean isExpired(TxnId initiated, long now);
 
     Txn emptyTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges);
+
+    default EventsListener metricsEventsListener()
+    {
+        return EventsListener.NOOP;
+    }
 }
diff --git a/accord-core/src/main/java/accord/api/EventsListener.java 
b/accord-core/src/main/java/accord/api/EventsListener.java
new file mode 100644
index 00000000..67a32942
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/EventsListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+import accord.local.Command;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+public interface EventsListener
+{
+    default void onCommitted(Command cmd)
+    {
+    }
+
+    default void onExecuted(Command cmd)
+    {
+    }
+
+    default void onApplied(Command cmd, long applyStartTimestamp)
+    {
+    }
+
+    default void onFastPathTaken(TxnId txnId, Deps deps)
+    {
+    }
+
+    default void onSlowPathTaken(TxnId txnId, Deps deps)
+    {
+    }
+
+    default void onRecover(TxnId txnId, Timestamp recoveryTimestamp)
+    {
+    }
+
+    default void onPreempted(TxnId txnId)
+    {
+    }
+
+    default void onTimeout(TxnId txnId)
+    {
+    }
+
+    default void onInvalidated(TxnId txnId)
+    {
+    }
+
+    default void onProgressLogSizeChange(TxnId txnId, int delta)
+    {
+    }
+
+    EventsListener NOOP = new EventsListener()
+    {
+    };
+}
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
index 7eb71ced..c1c3de9c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
@@ -216,7 +216,15 @@ abstract class CoordinatePreAccept<T> extends 
SettableResult<T> implements Callb
         if (extraPreAccept != null)
             extraPreAccept.extraPreAcceptIsDone = true;
         if (failure instanceof CoordinationFailed)
+        {
             ((CoordinationFailed) failure).set(txnId, route.homeKey());
+            if (failure instanceof Timeout)
+                node.agent().metricsEventsListener().onTimeout(txnId);
+            else if (failure instanceof Preempted)
+                node.agent().metricsEventsListener().onPreempted(txnId);
+            else if (failure instanceof Invalidated)
+                node.agent().metricsEventsListener().onInvalidated(txnId);
+        }
         super.setFailure(failure);
     }
 
@@ -265,7 +273,15 @@ abstract class CoordinatePreAccept<T> extends 
SettableResult<T> implements Callb
     public void accept(T success, Throwable failure)
     {
         if (failure instanceof CoordinationFailed)
+        {
             ((CoordinationFailed) failure).set(txnId, route.homeKey());
+            if (failure instanceof Preempted)
+                node.agent().metricsEventsListener().onPreempted(txnId);
+            else if (failure instanceof Timeout)
+                node.agent().metricsEventsListener().onTimeout(txnId);
+            else if (failure instanceof Invalidated)
+                node.agent().metricsEventsListener().onInvalidated(txnId);
+        }
 
         if (success != null) trySuccess(success);
         else tryFailure(failure);
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index ccdd5871..c7bd3627 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -63,6 +63,7 @@ public class CoordinateTransaction extends 
CoordinatePreAccept<Result>
         {
             Deps deps = Deps.merge(successes, ok -> 
ok.witnessedAt.equals(txnId) ? ok.deps : null);
             Execute.execute(node, txnId, txn, route, txnId, deps, this);
+            node.agent().metricsEventsListener().onFastPathTaken(txnId, deps);
         }
         else
         {
@@ -83,6 +84,8 @@ public class CoordinateTransaction extends 
CoordinatePreAccept<Result>
                 else
                     proposeAndExecute(node, topologies, Ballot.ZERO, txnId, 
txn, route, executeAt, deps, this);
             }
+
+            node.agent().metricsEventsListener().onSlowPathTaken(txnId, deps);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index 9dd8f08e..df56b791 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -153,8 +153,22 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
     public void accept(Result result, Throwable failure)
     {
         isDone = true;
-        if (failure == null) callback.accept(ProgressToken.APPLIED, null);
-        else callback.accept(null, failure);
+        if (failure == null)
+        {
+            callback.accept(ProgressToken.APPLIED, null);
+            node.agent().metricsEventsListener().onRecover(txnId, ballot);
+        }
+        else
+        {
+            callback.accept(null, failure);
+            if (failure instanceof Preempted)
+                node.agent().metricsEventsListener().onPreempted(txnId);
+            else if (failure instanceof Timeout)
+                node.agent().metricsEventsListener().onTimeout(txnId);
+            else if (failure instanceof Invalidated)
+                node.agent().metricsEventsListener().onInvalidated(txnId);
+        }
+
         node.agent().onRecover(node, result, failure);
     }
 
@@ -163,18 +177,18 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         return recover(node, txnId, txn, route, callback, 
node.topology().forEpoch(route, txnId.epoch()));
     }
 
-    public static Recover recover(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies 
topologies)
+    private static Recover recover(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies 
topologies)
     {
         Ballot ballot = new Ballot(node.uniqueNow());
         return recover(node, ballot, txnId, txn, route, callback, topologies);
     }
 
-    public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
+    private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
     {
         return recover(node, ballot, txnId, txn, route, callback, 
node.topology().forEpoch(route, txnId.epoch()));
     }
 
-    public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies 
topologies)
+    private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies 
topologies)
     {
         Recover recover = new Recover(node, ballot, txnId, txn, route, 
callback, topologies);
         recover.start(topologies.nodes());
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java 
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index fdcff134..dfcc3f0c 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -75,7 +75,7 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
         return recover(node, node.topology().forEpoch(route, txnId.epoch()), 
txnId, route, witnessedByInvalidation, callback);
     }
 
-    public static RecoverWithRoute recover(Node node, Topologies topologies, 
TxnId txnId, FullRoute<?> route, @Nullable Status witnessedByInvalidation, 
BiConsumer<Outcome, Throwable> callback)
+    private static RecoverWithRoute recover(Node node, Topologies topologies, 
TxnId txnId, FullRoute<?> route, @Nullable Status witnessedByInvalidation, 
BiConsumer<Outcome, Throwable> callback)
     {
         return recover(node, topologies, null, txnId, route, 
witnessedByInvalidation, callback);
     }
@@ -85,7 +85,7 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
         return recover(node, node.topology().forEpoch(route, txnId.epoch()), 
promisedBallot, txnId, route, witnessedByInvalidation, callback);
     }
 
-    public static RecoverWithRoute recover(Node node, Topologies topologies, 
Ballot ballot, TxnId txnId, FullRoute<?> route, Status witnessedByInvalidation, 
BiConsumer<Outcome, Throwable> callback)
+    private static RecoverWithRoute recover(Node node, Topologies topologies, 
Ballot ballot, TxnId txnId, FullRoute<?> route, Status witnessedByInvalidation, 
BiConsumer<Outcome, Throwable> callback)
     {
         RecoverWithRoute recover = new RecoverWithRoute(node, topologies, 
ballot, txnId, route, witnessedByInvalidation, callback);
         recover.start();
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java 
b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index a6779a2c..89291a1e 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -515,7 +515,10 @@ public class SimpleProgressLog implements 
ProgressLog.Factory
 
         State ensure(TxnId txnId)
         {
-            return stateMap.computeIfAbsent(txnId, State::new);
+            return stateMap.computeIfAbsent(txnId, ignored -> {
+                
node.agent().metricsEventsListener().onProgressLogSizeChange(txnId, 1);
+                return new State(txnId);
+            });
         }
 
         State ensure(TxnId txnId, State state)
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index dea108cc..357f8cf3 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -403,6 +403,13 @@ public abstract class CommandStores
         return new TopologyUpdate(new Snapshot(result.toArray(new 
ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
     }
 
+    public <R> R unsafeFoldLeft(R initial, BiFunction<R, CommandStore, R> f)
+    {
+        Snapshot snapshot = current;
+        for (ShardHolder shard : snapshot.shards)
+            initial = f.apply(initial, shard.store);
+        return initial;
+    }
 
     public AsyncChain<Void> forEach(Consumer<SafeCommandStore> forEach)
     {
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 52c37a5f..3193e70d 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -343,6 +343,7 @@ public class Commands
         WaitingOn waitingOn = initialiseWaitingOn(safeStore, txnId, executeAt, 
attrs.partialDeps(), attrs.route());
         command = safeCommand.commit(attrs, executeAt, waitingOn);
         safeStore.progressLog().committed(command, shard);
+        safeStore.agent().metricsEventsListener().onCommitted(command);
 
         // TODO (expected, safety): introduce intermediate status to avoid 
reentry when notifying listeners (which might notify us)
         maybeExecute(safeStore, safeCommand, true, true);
@@ -486,6 +487,7 @@ public class Commands
 
         maybeExecute(safeStore, safeCommand, true, true);
         safeStore.progressLog().executed(safeCommand.current(), shard);
+        safeStore.agent().metricsEventsListener().onExecuted(command);
 
         return ApplyOutcome.Success;
     }
@@ -582,8 +584,12 @@ public class Commands
         //  that was pre-bootstrap for some range (so redundant and we may 
have gone ahead of), but had to be executed locally
         //  for another range
         CommandStore unsafeStore = safeStore.commandStore();
+        long t0 = safeStore.time().now();
         return command.writes().apply(safeStore, applyRanges(safeStore, 
command.executeAt()), command.partialTxn())
                .flatMap(unused -> unsafeStore.submit(context, ss -> {
+                   Command cmd = ss.get(txnId).current();
+                   if (!cmd.hasBeen(Applied))
+                       ss.agent().metricsEventsListener().onApplied(cmd, t0);
                    postApply(ss, txnId);
                    return null;
                }));
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index ba1d104b..4f07a290 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -361,7 +361,7 @@ public class Timestamp implements Comparable<Timestamp>, 
EpochSupplier
     @Override
     public String toString()
     {
-        return "[" + epoch() + ',' + hlc() + ',' + flags() + ',' + node + ']';
+        return "[" + epoch() + ',' + hlc() + ',' + 
Integer.toBinaryString(flags()) + ',' + node + ']';
     }
 
     public static Timestamp fromString(String string)
@@ -370,7 +370,7 @@ public class Timestamp implements Comparable<Timestamp>, 
EpochSupplier
         assert split.length == 4;
         return Timestamp.fromValues(Long.parseLong(split[0]),
                                     Long.parseLong(split[1]),
-                                    Integer.parseInt(split[2]),
+                                    Integer.parseInt(split[2], 2),
                                     new Id(Integer.parseInt(split[3])));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to