This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c524b6d  Pre-requisite changes for CASSANDRA-18888 (#74)
c524b6d is described below

commit c524b6d3de3923ccb6314715bd987f3b891348ab
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Wed Jan 17 15:14:00 2024 +0000

    Pre-requisite changes for CASSANDRA-18888 (#74)
    
    patch by Aleksey Yeschenko; reviewed by Ariel Weisberg for
    CASSANDRA-18888
---
 .../src/main/java/accord/api/MessageSink.java      |   2 +-
 accord-core/src/main/java/accord/local/Node.java   |  29 +++---
 .../java/accord/messages/AbstractEpochRequest.java |   8 +-
 .../src/main/java/accord/messages/Commit.java      |   7 ++
 .../java/accord/messages/InformHomeDurable.java    |   6 ++
 .../{LocalMessage.java => LocalRequest.java}       |  19 +++-
 .../src/main/java/accord/messages/MessageType.java | 114 ++++++++++++---------
 .../src/main/java/accord/messages/Propagate.java   |  32 +++++-
 .../src/main/java/accord/messages/Request.java     |   5 +-
 .../src/main/java/accord/messages/TxnRequest.java  |   8 +-
 .../main/java/accord/messages/WaitOnCommit.java    |   8 +-
 accord-core/src/test/java/accord/Utils.java        |   8 +-
 .../accord/burn/BurnTestConfigurationService.java  |   6 ++
 .../src/test/java/accord/impl/basic/Cluster.java   |  16 +--
 .../test/java/accord/impl/list/ListRequest.java    |   6 ++
 .../test/java/accord/impl/mock/MockCluster.java    |   4 +-
 .../src/test/java/accord/local/CommandsTest.java   |   6 ++
 .../src/test/java/accord/utils/MessageTask.java    |   6 ++
 .../src/main/java/accord/maelstrom/Cluster.java    |   4 +-
 .../java/accord/maelstrom/MaelstromRequest.java    |   6 ++
 .../src/main/java/accord/maelstrom/Main.java       |   4 +-
 21 files changed, 208 insertions(+), 96 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/MessageSink.java 
b/accord-core/src/main/java/accord/api/MessageSink.java
index 47a3fa0..d05f304 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -28,7 +28,7 @@ import accord.messages.Request;
 public interface MessageSink
 {
     void send(Id to, Request request);
-    void send(Id to, Request request, AgentExecutor executor, Callback 
callback);
+    void send(Id to, Request request, AgentExecutor executor, Callback<?> 
callback);
     void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
     void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, 
Throwable failure);
 }
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 9e67cfa..6fc883e 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -65,7 +65,7 @@ import accord.coordinate.Persist;
 import accord.coordinate.RecoverWithRoute;
 import accord.messages.Apply;
 import accord.messages.Callback;
-import accord.messages.LocalMessage;
+import accord.messages.LocalRequest;
 import accord.messages.Reply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
@@ -147,7 +147,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
 
     private final Id id;
     private final MessageSink messageSink;
-    private final LocalMessage.Handler localMessageHandler;
+    private final LocalRequest.Handler localRequestHandler;
     private final ConfigurationService configService;
     private final TopologyManager topology;
     private final CommandStores commandStores;
@@ -168,7 +168,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
     // TODO (expected, liveness): monitor the contents of this collection for 
stalled coordination, and excise them
     private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = 
new ConcurrentHashMap<>();
 
-    public Node(Id id, MessageSink messageSink, LocalMessage.Handler 
localMessageHandler,
+    public Node(Id id, MessageSink messageSink, LocalRequest.Handler 
localRequestHandler,
                 ConfigurationService configService, LongSupplier nowSupplier, 
ToLongFunction<TimeUnit> nowTimeUnit,
                 Supplier<DataStore> dataSupplier, ShardDistributor 
shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, 
TopologySorter.Supplier topologySorter,
                 Function<Node, ProgressLog.Factory> progressLogFactory, 
CommandStores.Factory factory, Execute.Factory executionFactory, 
Persist.Factory persistFactory, Apply.Factory applyFactory,
@@ -177,7 +177,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         this.id = id;
         this.localConfig = localConfig;
         this.messageSink = messageSink;
-        this.localMessageHandler = localMessageHandler;
+        this.localRequestHandler = localRequestHandler;
         this.configService = configService;
         this.executionFactory = executionFactory;
         this.persistFactory = persistFactory;
@@ -529,9 +529,9 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         messageSink.send(to, send);
     }
 
-    public void localMessage(LocalMessage message)
+    public void localRequest(LocalRequest message)
     {
-        localMessageHandler.handle(message, this);
+        localRequestHandler.handle(message, this);
     }
 
     public void reply(Id replyingToNode, ReplyContext replyContext, Reply 
send, Throwable failure)
@@ -706,19 +706,16 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         return future;
     }
 
-    public void receive (Request request, Id from, ReplyContext replyContext)
+    public void receive(Request request, Id from, ReplyContext replyContext)
     {
-        long knownEpoch = request.knownEpoch();
-        if (knownEpoch > topology.epoch())
+        long waitForEpoch = request.waitForEpoch();
+        if (waitForEpoch > topology.epoch())
         {
-            configService.fetchTopologyForEpoch(knownEpoch);
-            long waitForEpoch = request.waitForEpoch();
-            if (waitForEpoch > topology.epoch())
-            {
-                topology().awaitEpoch(waitForEpoch).addCallback(() -> 
receive(request, from, replyContext));
-                return;
-            }
+            configService.fetchTopologyForEpoch(waitForEpoch);
+            topology().awaitEpoch(waitForEpoch).addCallback(() -> 
receive(request, from, replyContext));
+            return;
         }
+
         Runnable processMsg = () -> {
             try
             {
diff --git 
a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java 
b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
index 482fcf3..040364f 100644
--- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
+++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
@@ -37,11 +37,17 @@ public abstract class AbstractEpochRequest<R extends Reply> 
implements PreLoadCo
     }
 
     @Override
-    public void process(Node on, Node.Id replyTo, ReplyContext replyContext)
+    public void preProcess(Node on, Node.Id replyTo, ReplyContext replyContext)
     {
         this.node = on;
         this.replyTo = replyTo;
         this.replyContext = replyContext;
+    }
+
+    @Override
+    public void process(Node on, Node.Id replyTo, ReplyContext replyContext)
+    {
+        preProcess(on, replyTo, replyContext);
         process();
     }
 
diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index 14b26fd..bb05856 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -323,9 +323,16 @@ public class Commit extends TxnRequest<ReadNack>
             return waitForEpoch;
         }
 
+        @Override
+        public void preProcess(Node on, Id from, ReplyContext replyContext)
+        {
+            // no-op
+        }
+
         @Override
         public void process(Node node, Id from, ReplyContext replyContext)
         {
+
             node.forEachLocal(this, scope, txnId.epoch(), 
invalidateUntilEpoch, safeStore -> {
                 // it's fine for this to operate on a non-participating home 
key, since invalidation is a terminal state,
                 // so it doesn't matter if we resurrect a redundant entry
diff --git a/accord-core/src/main/java/accord/messages/InformHomeDurable.java 
b/accord-core/src/main/java/accord/messages/InformHomeDurable.java
index 6f2491d..ae871fd 100644
--- a/accord-core/src/main/java/accord/messages/InformHomeDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformHomeDurable.java
@@ -53,6 +53,12 @@ public class InformHomeDurable implements Request
         this.persistedOn = ImmutableSet.copyOf(persistedOn); // Persisted on 
might be mutated later
     }
 
+    @Override
+    public void preProcess(Node node, Id replyToNode, ReplyContext 
replyContext)
+    {
+        // no-op
+    }
+
     @Override
     public void process(Node node, Id replyToNode, ReplyContext replyContext)
     {
diff --git a/accord-core/src/main/java/accord/messages/LocalMessage.java 
b/accord-core/src/main/java/accord/messages/LocalRequest.java
similarity index 65%
rename from accord-core/src/main/java/accord/messages/LocalMessage.java
rename to accord-core/src/main/java/accord/messages/LocalRequest.java
index 07b1189..d12e35a 100644
--- a/accord-core/src/main/java/accord/messages/LocalMessage.java
+++ b/accord-core/src/main/java/accord/messages/LocalRequest.java
@@ -19,13 +19,24 @@ package accord.messages;
 
 import accord.local.Node;
 import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.utils.MapReduceConsume;
 
-public interface LocalMessage extends Message, PreLoadContext
+import java.util.function.BiConsumer;
+
+public interface LocalRequest<R> extends Request, PreLoadContext, 
MapReduceConsume<SafeCommandStore, Void>
 {
+    /**
+     * Process the request without executing the callback
+     */
+    void process(Node on);
+
+    void process(Node on, BiConsumer<R, Throwable> callback);
+
+    BiConsumer<R, Throwable> callback();
+
     interface Handler
     {
-        void handle(LocalMessage message, Node node);
+        void handle(LocalRequest<?> message, Node node);
     }
-
-    void process(Node node);
 }
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java 
b/accord-core/src/main/java/accord/messages/MessageType.java
index 56ea2ed..8111f5f 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -31,49 +31,52 @@ import static accord.messages.MessageType.Kind.LOCAL;
  */
 public class MessageType
 {
-    public static final MessageType SIMPLE_RSP                       = 
mt(REMOTE, false);
-    public static final MessageType FAILURE_RSP                      = 
mt(REMOTE, false);
-    public static final MessageType PRE_ACCEPT_REQ                   = 
mt(REMOTE, true );
-    public static final MessageType PRE_ACCEPT_RSP                   = 
mt(REMOTE, false);
-    public static final MessageType ACCEPT_REQ                       = 
mt(REMOTE, true );
-    public static final MessageType ACCEPT_RSP                       = 
mt(REMOTE, false);
-    public static final MessageType ACCEPT_INVALIDATE_REQ            = 
mt(REMOTE, true );
-    public static final MessageType GET_DEPS_REQ                     = 
mt(REMOTE, false);
-    public static final MessageType GET_DEPS_RSP                     = 
mt(REMOTE, false);
-    public static final MessageType COMMIT_MINIMAL_REQ               = 
mt(REMOTE, true );
-    public static final MessageType COMMIT_MAXIMAL_REQ               = 
mt(REMOTE, true );
-    public static final MessageType COMMIT_INVALIDATE_REQ            = 
mt(REMOTE, true );
-    public static final MessageType APPLY_MINIMAL_REQ                = 
mt(REMOTE, true );
-    public static final MessageType APPLY_MAXIMAL_REQ                = 
mt(REMOTE, true );
-    public static final MessageType APPLY_RSP                        = 
mt(REMOTE, false);
-    public static final MessageType READ_REQ                         = 
mt(REMOTE, false);
-    public static final MessageType READ_RSP                         = 
mt(REMOTE, false);
-    public static final MessageType BEGIN_RECOVER_REQ                = 
mt(REMOTE, true );
-    public static final MessageType BEGIN_RECOVER_RSP                = 
mt(REMOTE, false);
-    public static final MessageType BEGIN_INVALIDATE_REQ             = 
mt(REMOTE, true );
-    public static final MessageType BEGIN_INVALIDATE_RSP             = 
mt(REMOTE, false);
-    public static final MessageType WAIT_ON_COMMIT_REQ               = 
mt(REMOTE, false);
-    public static final MessageType WAIT_ON_COMMIT_RSP               = 
mt(REMOTE, false);
-    public static final MessageType WAIT_UNTIL_APPLIED_REQ           = 
mt(REMOTE, false);
-    public static final MessageType INFORM_OF_TXN_REQ                = 
mt(REMOTE, true );
-    public static final MessageType INFORM_DURABLE_REQ               = 
mt(REMOTE, true );
-    public static final MessageType INFORM_HOME_DURABLE_REQ          = 
mt(REMOTE, true );
-    public static final MessageType CHECK_STATUS_REQ                 = 
mt(REMOTE, false);
-    public static final MessageType CHECK_STATUS_RSP                 = 
mt(REMOTE, false);
-    public static final MessageType FETCH_DATA_REQ                   = 
mt(REMOTE, false);
-    public static final MessageType FETCH_DATA_RSP                   = 
mt(REMOTE, false);
-    public static final MessageType SET_SHARD_DURABLE_REQ            = 
mt(REMOTE, true );
-    public static final MessageType SET_GLOBALLY_DURABLE_REQ         = 
mt(REMOTE, true );
-    public static final MessageType QUERY_DURABLE_BEFORE_REQ         = 
mt(REMOTE, false);
-    public static final MessageType QUERY_DURABLE_BEFORE_RSP         = 
mt(REMOTE, false);
-    public static final MessageType APPLY_THEN_WAIT_UNTIL_APPLIED_REQ= 
mt(REMOTE, true );
-
-    public static final MessageType PROPAGATE_PRE_ACCEPT_MSG         = 
mt(LOCAL,  true );
-    public static final MessageType PROPAGATE_COMMIT_MSG             = 
mt(LOCAL,  true );
-    public static final MessageType PROPAGATE_APPLY_MSG              = 
mt(LOCAL,  true );
-    public static final MessageType PROPAGATE_OTHER_MSG              = 
mt(LOCAL,  true );
+    public static final MessageType SIMPLE_RSP                        = 
remote("SIMPLE_RSP",                        false);
+    public static final MessageType FAILURE_RSP                       = 
remote("FAILURE_RSP",                       false);
+    public static final MessageType PRE_ACCEPT_REQ                    = 
remote("PRE_ACCEPT_REQ",                    true );
+    public static final MessageType PRE_ACCEPT_RSP                    = 
remote("PRE_ACCEPT_RSP",                    false);
+    public static final MessageType ACCEPT_REQ                        = 
remote("ACCEPT_REQ",                        true );
+    public static final MessageType ACCEPT_RSP                        = 
remote("ACCEPT_RSP",                        false);
+    public static final MessageType ACCEPT_INVALIDATE_REQ             = 
remote("ACCEPT_INVALIDATE_REQ",             true );
+    public static final MessageType GET_DEPS_REQ                      = 
remote("GET_DEPS_REQ",                      false);
+    public static final MessageType GET_DEPS_RSP                      = 
remote("GET_DEPS_RSP",                      false);
+    public static final MessageType COMMIT_MINIMAL_REQ                = 
remote("COMMIT_MINIMAL_REQ",                true );
+    public static final MessageType COMMIT_MAXIMAL_REQ                = 
remote("COMMIT_MAXIMAL_REQ",                true );
+    public static final MessageType COMMIT_INVALIDATE_REQ             = 
remote("COMMIT_INVALIDATE_REQ",             true );
+    public static final MessageType APPLY_MINIMAL_REQ                 = 
remote("APPLY_MINIMAL_REQ",                 true );
+    public static final MessageType APPLY_MAXIMAL_REQ                 = 
remote("APPLY_MAXIMAL_REQ",                 true );
+    public static final MessageType APPLY_RSP                         = 
remote("APPLY_RSP",                         false);
+    public static final MessageType READ_REQ                          = 
remote("READ_REQ",                          false);
+    public static final MessageType READ_RSP                          = 
remote("READ_RSP",                          false);
+    public static final MessageType BEGIN_RECOVER_REQ                 = 
remote("BEGIN_RECOVER_REQ",                 true );
+    public static final MessageType BEGIN_RECOVER_RSP                 = 
remote("BEGIN_RECOVER_RSP",                 false);
+    public static final MessageType BEGIN_INVALIDATE_REQ              = 
remote("BEGIN_INVALIDATE_REQ",              true );
+    public static final MessageType BEGIN_INVALIDATE_RSP              = 
remote("BEGIN_INVALIDATE_RSP",              false);
+    public static final MessageType WAIT_ON_COMMIT_REQ                = 
remote("WAIT_ON_COMMIT_REQ",                false);
+    public static final MessageType WAIT_ON_COMMIT_RSP                = 
remote("WAIT_ON_COMMIT_RSP",                false);
+    public static final MessageType WAIT_UNTIL_APPLIED_REQ            = 
remote("WAIT_UNTIL_APPLIED_REQ",            false);
+    public static final MessageType INFORM_OF_TXN_REQ                 = 
remote("INFORM_OF_TXN_REQ",                 true );
+    public static final MessageType INFORM_DURABLE_REQ                = 
remote("INFORM_DURABLE_REQ",                true );
+    public static final MessageType INFORM_HOME_DURABLE_REQ           = 
remote("INFORM_HOME_DURABLE_REQ",           true );
+    public static final MessageType CHECK_STATUS_REQ                  = 
remote("CHECK_STATUS_REQ",                  false);
+    public static final MessageType CHECK_STATUS_RSP                  = 
remote("CHECK_STATUS_RSP",                  false);
+    public static final MessageType FETCH_DATA_REQ                    = 
remote("FETCH_DATA_REQ",                    false);
+    public static final MessageType FETCH_DATA_RSP                    = 
remote("FETCH_DATA_RSP",                    false);
+    public static final MessageType SET_SHARD_DURABLE_REQ             = 
remote("SET_SHARD_DURABLE_REQ",             true );
+    public static final MessageType SET_GLOBALLY_DURABLE_REQ          = 
remote("SET_GLOBALLY_DURABLE_REQ",          true );
+    public static final MessageType QUERY_DURABLE_BEFORE_REQ          = 
remote("QUERY_DURABLE_BEFORE_REQ",          false);
+    public static final MessageType QUERY_DURABLE_BEFORE_RSP          = 
remote("QUERY_DURABLE_BEFORE_RSP",          false);
+    public static final MessageType APPLY_THEN_WAIT_UNTIL_APPLIED_REQ = 
remote("APPLY_THEN_WAIT_UNTIL_APPLIED_REQ", true );
+
+    public static final MessageType PROPAGATE_PRE_ACCEPT_MSG          = 
local("PROPAGATE_PRE_ACCEPT_MSG", true);
+    public static final MessageType PROPAGATE_COMMIT_MSG              = 
local("PROPAGATE_COMMIT_MSG",     true);
+    public static final MessageType PROPAGATE_APPLY_MSG               = 
local("PROPAGATE_APPLY_MSG",      true);
+    public static final MessageType PROPAGATE_OTHER_MSG               = 
local("PROPAGATE_OTHER_MSG",      true);
 
 
+    /**
+     * LOCAL messages are not sent to remote nodes.
+     */
     public enum Kind { LOCAL, REMOTE }
 
     public static final List<MessageType> values;
@@ -98,14 +101,17 @@ public class MessageType
         values = builder.build();
     }
 
-    protected static MessageType mt(Kind kind, boolean hasSideEffects)
+    protected static MessageType local(String name, boolean hasSideEffects)
     {
-        return new MessageType(kind, hasSideEffects);
+        return new MessageType(name, LOCAL, hasSideEffects);
     }
 
-    /**
-     * LOCAL messages are not sent to remote nodes.
-     */
+    protected static MessageType remote(String name, boolean hasSideEffects)
+    {
+        return new MessageType(name, REMOTE, hasSideEffects);
+    }
+
+    private final String name;
     private final Kind kind;
 
     /**
@@ -113,10 +119,22 @@ public class MessageType
      */
     private final boolean hasSideEffects;
 
-    protected MessageType(Kind kind, boolean hasSideEffects)
+    protected MessageType(String name, Kind kind, boolean hasSideEffects)
     {
-        this.hasSideEffects = hasSideEffects;
+        this.name = name;
         this.kind = kind;
+        this.hasSideEffects = hasSideEffects;
+    }
+
+    public String name()
+    {
+        return name;
+    }
+
+    @Override
+    public String toString()
+    {
+        return name();
     }
 
     public boolean isLocal()
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index a0821bc..e1a3679 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -41,7 +41,6 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
-import accord.utils.MapReduceConsume;
 
 import javax.annotation.Nullable;
 import java.util.function.BiConsumer;
@@ -54,7 +53,7 @@ import static accord.local.Status.PreApplied;
 import static accord.messages.CheckStatus.WithQuorum.HasQuorum;
 import static accord.primitives.Routables.Slice.Minimal;
 
-public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, 
EpochSupplier, LocalMessage
+public class Propagate implements EpochSupplier, LocalRequest<Status.Known>
 {
     public static class SerializerSupport
     {
@@ -83,7 +82,13 @@ public class Propagate implements 
MapReduceConsume<SafeCommandStore, Void>, Epoc
     @Nullable public final Writes writes;
     @Nullable public final Result result;
 
-    transient final BiConsumer<Status.Known, Throwable> callback;
+    protected transient BiConsumer<Status.Known, Throwable> callback;
+
+    @Override
+    public BiConsumer<Status.Known, Throwable> callback()
+    {
+        return callback;
+    }
 
     Propagate(
         TxnId txnId,
@@ -123,6 +128,13 @@ public class Propagate implements 
MapReduceConsume<SafeCommandStore, Void>, Epoc
         this.callback = callback;
     }
 
+    @Override
+    public void process(Node on, BiConsumer<Status.Known, Throwable> callback)
+    {
+        this.callback = callback;
+        process(on);
+    }
+
     @SuppressWarnings({"rawtypes", "unchecked"})
     public static void propagate(Node node, TxnId txnId, long sourceEpoch, 
WithQuorum withQuorum, Route route, @Nullable Status.Known target, 
CheckStatus.CheckStatusOkFull full, BiConsumer<Status.Known, Throwable> 
callback)
     {
@@ -190,7 +202,7 @@ public class Propagate implements 
MapReduceConsume<SafeCommandStore, Void>, Epoc
         Propagate propagate =
             new Propagate(txnId, route, full.maxKnowledgeSaveStatus, 
full.maxSaveStatus, full.durability, full.homeKey, progressKey, achieved, 
full.map, isTruncated, partialTxn, committedDeps, toEpoch, 
full.executeAtIfKnown(), full.writes, full.result, callback);
 
-        node.localMessage(propagate);
+        node.localRequest(propagate);
     }
 
     @Override
@@ -212,6 +224,18 @@ public class Propagate implements 
MapReduceConsume<SafeCommandStore, Void>, Epoc
             return Keys.EMPTY;
     }
 
+    @Override
+    public void preProcess(Node on, Node.Id from, ReplyContext replyContext)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void process(Node on, Node.Id from, ReplyContext replyContext)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void process(Node node)
     {
diff --git a/accord-core/src/main/java/accord/messages/Request.java 
b/accord-core/src/main/java/accord/messages/Request.java
index 4cd9861..0c92371 100644
--- a/accord-core/src/main/java/accord/messages/Request.java
+++ b/accord-core/src/main/java/accord/messages/Request.java
@@ -23,7 +23,8 @@ import accord.local.Node.Id;
 
 public interface Request extends Message
 {
-    void process(Node on, Id from, ReplyContext replyContext);
     default long waitForEpoch() { return 0; }
-    default long knownEpoch() { return waitForEpoch(); }
+    void preProcess(Node on, Id from, ReplyContext replyContext);
+    void process(Node on, Id from, ReplyContext replyContext);
+
 }
diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java 
b/accord-core/src/main/java/accord/messages/TxnRequest.java
index 2520ad8..4cd3942 100644
--- a/accord-core/src/main/java/accord/messages/TxnRequest.java
+++ b/accord-core/src/main/java/accord/messages/TxnRequest.java
@@ -136,12 +136,18 @@ public abstract class TxnRequest<R> implements Request, 
PreLoadContext, MapReduc
     }
 
     @Override
-    public void process(Node on, Id replyTo, ReplyContext replyContext)
+    public void preProcess(Node on, Id replyTo, ReplyContext replyContext)
     {
         this.node = on;
         this.replyTo = replyTo;
         this.replyContext = replyContext;
         this.progressKey = progressKey(); // TODO (low priority, clarity): not 
every class that extends TxnRequest needs this set
+    }
+
+    @Override
+    public void process(Node on, Id replyTo, ReplyContext replyContext)
+    {
+        preProcess(on, replyTo, replyContext);
         process();
     }
 
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java 
b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index ca52b69..9d0105d 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -70,11 +70,17 @@ public class WaitOnCommit implements Request, 
MapReduceConsume<SafeCommandStore,
     }
 
     @Override
-    public void process(Node node, Id replyToNode, ReplyContext replyContext)
+    public void preProcess(Node node, Id replyToNode, ReplyContext 
replyContext)
     {
         this.node = node;
         this.replyTo = replyToNode;
         this.replyContext = replyContext;
+    }
+
+    @Override
+    public void process(Node node, Id replyToNode, ReplyContext replyContext)
+    {
+        preProcess(node, replyToNode, replyContext);
         node.mapReduceConsumeLocal(this, scope, txnId.epoch(), txnId.epoch(), 
this);
     }
 
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 40d4df0..787d9de 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -30,9 +30,10 @@ import com.google.common.collect.Sets;
 import accord.api.Key;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
 import accord.coordinate.TxnExecute;
 import accord.coordinate.TxnPersist;
-import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
 import accord.impl.SimpleProgressLog;
@@ -48,8 +49,7 @@ import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.Apply;
-import accord.config.MutableLocalConfig;
-import accord.messages.LocalMessage;
+import accord.messages.LocalRequest;
 import accord.primitives.Keys;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
@@ -165,7 +165,7 @@ public class Utils
         LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(nodeId,
                              messageSink,
-                             LocalMessage::process,
+                             LocalRequest::process,
                              new MockConfigurationService(messageSink, 
EpochFunction.noop(), topology),
                              clock,
                              
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 01ea96d..aca0e48 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -66,6 +66,12 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
             this.epoch = epoch;
         }
 
+        @Override
+        public void preProcess(Node on, Node.Id from, ReplyContext 
replyContext)
+        {
+            // no-op
+        }
+
         @Override
         public void process(Node on, Node.Id from, ReplyContext replyContext)
         {
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a9dbcf5..6d1e226 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -37,8 +37,6 @@ import java.util.function.IntSupplier;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
-import accord.config.LocalConfig;
-import accord.impl.MessageListener;
 import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,25 +45,27 @@ import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.burn.BurnTestConfigurationService;
 import accord.burn.TopologyUpdates;
+import accord.burn.random.FrequentLargeRange;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
 import accord.coordinate.TxnExecute;
 import accord.coordinate.TxnPersist;
-import accord.burn.random.FrequentLargeRange;
 import accord.impl.CoordinateDurabilityScheduling;
+import accord.impl.MessageListener;
 import accord.impl.PrefixedIntHashKey;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TopologyFactory;
 import accord.impl.list.ListStore;
 import accord.local.AgentExecutor;
-import accord.local.Node;
 import accord.local.Node.Id;
+import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.Apply;
-import accord.config.MutableLocalConfig;
-import accord.messages.LocalMessage;
-import accord.messages.MessageType;
+import accord.messages.LocalRequest;
 import accord.messages.Message;
+import accord.messages.MessageType;
 import accord.messages.Reply;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
@@ -292,7 +292,7 @@ public class Cluster implements Scheduler
                 executorMap.put(id, nodeExecutor);
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, 
nodeMap::get, topologyUpdates);
                 BooleanSupplier isLoadedCheck = 
random.biasedUniformBools(0.5f);
-                Node node = new Node(id, messageSink, LocalMessage::process, 
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, 
nowSupplier),
+                Node node = new Node(id, messageSink, LocalRequest::process, 
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, 
nowSupplier),
                                      () -> new ListStore(id), new 
ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()),
                                      nodeExecutor.agent(),
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index 75713a0..cd58a8a 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -238,6 +238,12 @@ public class ListRequest implements Request
         this.listener = listener;
     }
 
+    @Override
+    public void preProcess(Node node, Id client, ReplyContext replyContext)
+    {
+        // no-op
+    }
+
     @Override
     public void process(Node node, Id client, ReplyContext replyContext)
     {
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 19bf3e6..199ec1a 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -50,7 +50,7 @@ import accord.local.ShardDistributor;
 import accord.messages.Apply;
 import accord.config.MutableLocalConfig;
 import accord.messages.Callback;
-import accord.messages.LocalMessage;
+import accord.messages.LocalRequest;
 import accord.messages.Reply;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
@@ -127,7 +127,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
         LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(id,
                              messageSink,
-                             LocalMessage::process,
+                             LocalRequest::process,
                              configurationService,
                              nowSupplier,
                              
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
index c938981..6731a74 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -101,6 +101,12 @@ class CommandsTest
 
             cluster(rs::fork, nodes, initialTopology, nodeMap -> new Request()
             {
+                @Override
+                public void preProcess(Node on, Node.Id from, ReplyContext 
replyContext)
+                {
+                    // no-op
+                }
+
                 @Override
                 public void process(Node node, Node.Id from, ReplyContext 
replyContext)
                 {
diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java 
b/accord-core/src/test/java/accord/utils/MessageTask.java
index 528a6e4..d53128c 100644
--- a/accord-core/src/test/java/accord/utils/MessageTask.java
+++ b/accord-core/src/test/java/accord/utils/MessageTask.java
@@ -94,6 +94,12 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
             this.desc = desc;
         }
 
+        @Override
+        public void preProcess(Node on, Node.Id from, ReplyContext 
replyContext)
+        {
+            // no-op
+        }
+
         @Override
         public void process(Node on, Node.Id from, ReplyContext replyContext)
         {
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index f69e4ed..047240a 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -55,7 +55,7 @@ import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.Apply;
 import accord.messages.Callback;
-import accord.messages.LocalMessage;
+import accord.messages.LocalRequest;
 import accord.messages.Reply;
 import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
@@ -320,7 +320,7 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(node, 
randomSupplier.get());
                 LongSupplier nowSupplier = nowSupplierSupplier.get();
                 LocalConfig localConfig = new MutableLocalConfig();
-                lookup.put(node, new Node(node, messageSink, 
LocalMessage::process, new SimpleConfigService(topology),
+                lookup.put(node, new Node(node, messageSink, 
LocalRequest::process, new SimpleConfigService(topology),
                                           nowSupplier, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
                                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                                           MaelstromAgent.INSTANCE,
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
index f9aad09..5dfcf3e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -46,6 +46,12 @@ public class MaelstromRequest extends Body implements Request
         this.txn = txn;
     }
 
+    @Override
+    public void preProcess(Node node, Id client, ReplyContext replyContext)
+    {
+        // no-op
+    }
+
     @Override
     public void process(Node node, Id client, ReplyContext replyContext)
     {
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 5d2a459..c79b3b4 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -48,7 +48,7 @@ import accord.local.ShardDistributor;
 import accord.maelstrom.Packet.Type;
 import accord.messages.Apply;
 import accord.messages.Callback;
-import accord.messages.LocalMessage;
+import accord.messages.LocalRequest;
 import accord.messages.Reply;
 import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
@@ -179,7 +179,7 @@ public class Main
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, 
init.self, out, err);
             LocalConfig localConfig = new MutableLocalConfig();
-            on = new Node(init.self, sink, LocalMessage::process, new 
SimpleConfigService(topology),
+            on = new Node(init.self, sink, LocalRequest::process, new 
SimpleConfigService(topology),
                           System::currentTimeMillis, 
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), 
scheduler, SizeOfIntersectionSorter.SUPPLIER,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to