This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8226b2d7 CEP-15: (C*) Accord message processing should avoid being
passed on to a Stage and run directly in the messageing handler
8226b2d7 is described below
commit 8226b2d7759319d7a0b0c823ab09b4344c5423f7
Author: David Capwell <[email protected]>
AuthorDate: Thu Apr 27 11:10:37 2023 -0700
CEP-15: (C*) Accord message processing should avoid being passed on to a
Stage and run directly in the messageing handler
patch by David Capwell; reviewed by Ariel Weisberg, Benedict Elliott Smith
for CASSANDRA-18364
---
.build/checkstyle/checkstyle.xml | 36 +++++++
.build/checkstyle/suppressions.xml | 24 +++++
.circleci/config.yml | 8 +-
.../src/main/java/accord/api/MessageSink.java | 3 +-
.../src/main/java/accord/api/RoutingKey.java | 5 -
accord-core/src/main/java/accord/api/Update.java | 1 -
.../main/java/accord/coordinate/Coordinate.java | 3 +-
.../src/main/java/accord/coordinate/Persist.java | 4 +-
.../src/main/java/accord/coordinate/Propose.java | 1 -
.../java/accord/impl/InMemoryCommandStore.java | 89 ++++++++-------
.../java/accord/impl/InMemoryCommandStores.java | 17 +--
.../MessageSink.java => local/AgentExecutor.java} | 21 ++--
.../src/main/java/accord/local/CommandStore.java | 119 +++++++++++++++++++--
.../src/main/java/accord/local/CommandStores.java | 40 +++++--
accord-core/src/main/java/accord/local/Node.java | 46 ++++++--
.../main/java/accord/local/SafeCommandStore.java | 7 ++
.../src/main/java/accord/messages/ReadData.java | 2 +-
.../main/java/accord/messages/SafeCallback.java | 91 ++++++++++++++++
.../src/main/java/accord/primitives/KeyDeps.java | 1 -
.../java/accord/primitives/PartialRangeRoute.java | 1 -
.../main/java/accord/primitives/PartialTxn.java | 1 -
.../src/main/java/accord/primitives/RangeDeps.java | 2 -
.../src/main/java/accord/primitives/Routables.java | 4 -
.../src/main/java/accord/primitives/Txn.java | 2 -
.../main/java/accord/topology/TopologyManager.java | 16 ++-
.../main/java/accord/utils/async/AsyncChain.java | 25 +++++
.../main/java/accord/utils/async/AsyncChains.java | 61 +++++++++++
.../src/test/java/accord/burn/BurnTest.java | 8 +-
.../accord/burn/BurnTestConfigurationService.java | 23 ++--
.../SimulationException.java} | 36 +++----
.../src/test/java/accord/burn/TopologyUpdates.java | 34 +++---
.../java/accord/coordinate/CoordinateTest.java | 1 +
.../java/accord/coordinate/TopologyChangeTest.java | 2 -
.../coordinate/tracking/ReadTrackerTest.java | 1 -
.../coordinate/tracking/TrackerReconciler.java | 11 +-
.../src/test/java/accord/impl/basic/Cluster.java | 37 +++----
.../accord/impl/basic/DelayedCommandStores.java | 90 +++++++++++++++-
.../src/test/java/accord/impl/basic/NodeSink.java | 39 ++-----
.../basic/SimulatedDelayedExecutorService.java | 13 +--
.../accord/impl/basic/TaskExecutorService.java | 4 +-
.../java/accord/impl/basic/UniformRandomQueue.java | 99 -----------------
.../src/test/java/accord/impl/list/ListData.java | 1 -
.../test/java/accord/impl/list/ListRequest.java | 32 +++---
.../src/test/java/accord/impl/list/ListStore.java | 2 -
.../test/java/accord/impl/mock/MockCluster.java | 28 ++---
.../src/test/java/accord/impl/mock/MockStore.java | 2 -
.../src/test/java/accord/impl/mock/Network.java | 5 +-
.../accord/impl/mock/RecordingMessageSink.java | 5 +-
.../java/accord/impl/mock/SimpleMessageSink.java | 7 +-
.../src/test/java/accord/utils/MessageTask.java | 31 +++---
.../src/main/java/accord/maelstrom/Cluster.java | 27 ++---
.../src/main/java/accord/maelstrom/Json.java | 13 ++-
.../main/java/accord/maelstrom/MaelstromQuery.java | 1 -
.../src/main/java/accord/maelstrom/Main.java | 4 +-
.../src/main/groovy/accord.java-conventions.gradle | 6 ++
55 files changed, 784 insertions(+), 408 deletions(-)
diff --git a/.build/checkstyle/checkstyle.xml b/.build/checkstyle/checkstyle.xml
new file mode 100644
index 00000000..c8052136
--- /dev/null
+++ b/.build/checkstyle/checkstyle.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE module PUBLIC
+ "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
+ "https://checkstyle.org/dtds/configuration_1_3.dtd">
+
+<module name="Checker">
+ <property name="severity" value="error"/>
+
+ <property name="fileExtensions" value="java, properties, xml"/>
+
+ <module name="BeforeExecutionExclusionFileFilter">
+ <property name="fileNamePattern" value="module\-info\.java$"/>
+ </module>
+
+ <module name="TreeWalker">
+ <module name="RedundantImport"/>
+ <module name="UnusedImports"/>
+ </module>
+
+</module>
diff --git a/.build/checkstyle/suppressions.xml
b/.build/checkstyle/suppressions.xml
new file mode 100644
index 00000000..230c808c
--- /dev/null
+++ b/.build/checkstyle/suppressions.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE suppressions PUBLIC
+ "-//Checkstyle//DTD SuppressionFilter Configuration 1.1//EN"
+ "https://checkstyle.org/dtds/suppressions_1_1.dtd">
+
+<suppressions>
+</suppressions>
diff --git a/.circleci/config.yml b/.circleci/config.yml
index c2070aa7..2f59771f 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -81,7 +81,7 @@ jobs:
key: v1-dependencies-{{ checksum "build.gradle" }}-{{ checksum
"accord-core/build.gradle" }}-{{ checksum "accord-maelstrom/build.gradle" }}-{{
checksum "gradle/wrapper/gradle-wrapper.properties" }}
# run tests!
- - run: ./gradlew test
+ - run: ./gradlew check
- run:
name: Save Test Results
command: |
@@ -90,3 +90,9 @@ jobs:
when: always
- store_test_results:
path: test-results
+
+workflows:
+ version: 2
+ main:
+ jobs:
+ - build: {}
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java
b/accord-core/src/main/java/accord/api/MessageSink.java
index 38ac946d..ee4d681e 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -18,6 +18,7 @@
package accord.api;
+import accord.local.AgentExecutor;
import accord.local.Node.Id;
import accord.messages.Callback;
import accord.messages.Reply;
@@ -27,6 +28,6 @@ import accord.messages.Request;
public interface MessageSink
{
void send(Id to, Request request);
- void send(Id to, Request request, Callback callback);
+ void send(Id to, Request request, AgentExecutor executor, Callback
callback);
void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
}
diff --git a/accord-core/src/main/java/accord/api/RoutingKey.java
b/accord-core/src/main/java/accord/api/RoutingKey.java
index 44b24f4f..4ae65067 100644
--- a/accord-core/src/main/java/accord/api/RoutingKey.java
+++ b/accord-core/src/main/java/accord/api/RoutingKey.java
@@ -21,11 +21,6 @@ package accord.api;
import accord.primitives.Range;
import accord.primitives.RoutableKey;
import accord.primitives.Unseekable;
-import accord.utils.ArrayBuffers;
-
-import java.util.Arrays;
-
-import static accord.utils.ArrayBuffers.cachedRoutingKeys;
public interface RoutingKey extends Unseekable, RoutableKey
{
diff --git a/accord-core/src/main/java/accord/api/Update.java
b/accord-core/src/main/java/accord/api/Update.java
index b3eb3875..f4092d3b 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -19,7 +19,6 @@
package accord.api;
import accord.primitives.Ranges;
-import accord.primitives.Keys;
import accord.primitives.Seekables;
import javax.annotation.Nullable;
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java
b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index 839f41c9..f08dd399 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -72,7 +72,8 @@ public class Coordinate extends
AsyncResults.SettableResult<Result> implements C
{
// TODO (desired, efficiency): consider sending only to electorate of
most recent topology (as only these PreAccept votes matter)
// note that we must send to all replicas of old topology, as
electorate may not be reachable
- node.send(tracker.nodes(), to -> new PreAccept(to,
tracker.topologies(), txnId, txn, route), this);
+ node.send(tracker.nodes(), to -> new PreAccept(to,
tracker.topologies(), txnId, txn, route),
+ node.commandStores().select(route.homeKey()), this);
}
public static AsyncResult<Result> coordinate(Node node, TxnId txnId, Txn
txn, FullRoute<?> route)
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 076cc669..926697f4 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -94,13 +94,13 @@ public class Persist implements Callback<ApplyReply>
{
// TODO (low priority, consider, efficiency): send to
non-home replicas also, so they may clear their log more easily?
Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
- node.send(homeShard, new InformHomeDurable(txnId,
route.homeKey(), executeAt, Durable, persistedOn));
+ node.send(homeShard, new InformHomeDurable(txnId,
route.homeKey(), executeAt, Durable, new HashSet<>(persistedOn)));
isDone = true;
}
else if (!tracker.hasInFlight() && !tracker.hasFailures())
{
Shard homeShard =
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
- node.send(homeShard, new InformHomeDurable(txnId,
route.homeKey(), executeAt, Universal, persistedOn));
+ node.send(homeShard, new InformHomeDurable(txnId,
route.homeKey(), executeAt, Universal, new HashSet<>(persistedOn)));
}
}
break;
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 6a9f6468..62d4cd61 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -38,7 +38,6 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Accept;
import accord.messages.Accept.AcceptReply;
-import accord.utils.Invariants;
import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
import static accord.coordinate.tracking.RequestStatus.Failed;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index ff34236d..c7daf962 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -45,17 +44,10 @@ import static accord.local.SafeCommandStore.TestKind.Ws;
import static accord.local.Status.*;
import static accord.primitives.Routables.Slice.Minimal;
-public abstract class InMemoryCommandStore implements CommandStore
+public abstract class InMemoryCommandStore extends CommandStore
{
private static final Logger logger =
LoggerFactory.getLogger(InMemoryCommandStore.class);
- private final int id;
- private final NodeTimeService time;
- private final Agent agent;
- private final DataStore store;
- private final ProgressLog progressLog;
- private final RangesForEpochHolder rangesForEpochHolder;
-
private final NavigableMap<TxnId, GlobalCommand> commands = new
TreeMap<>();
private final NavigableMap<RoutableKey, GlobalCommandsForKey>
commandsForKey = new TreeMap<>();
private final CFKLoader cfkLoader = new CFKLoader();
@@ -67,12 +59,7 @@ public abstract class InMemoryCommandStore implements
CommandStore
public InMemoryCommandStore(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder
rangesForEpochHolder)
{
- this.id = id;
- this.time = time;
- this.agent = agent;
- this.store = store;
- this.progressLog = progressLogFactory.create(this);
- this.rangesForEpochHolder = rangesForEpochHolder;
+ super(id, time, agent, store, progressLogFactory,
rangesForEpochHolder);
}
@Override
@@ -381,7 +368,6 @@ public abstract class InMemoryCommandStore implements
CommandStore
protected <T> T executeInContext(InMemoryCommandStore commandStore,
PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
return executeInContext(commandStore, context, function, true);
-
}
protected <T> void executeInContext(InMemoryCommandStore commandStore,
PreLoadContext context, Function<? super SafeCommandStore, T> function,
BiConsumer<? super T, Throwable> callback)
@@ -398,6 +384,14 @@ public abstract class InMemoryCommandStore implements
CommandStore
}
}
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "id=" + id +
+ '}';
+ }
+
class RangeCommand
{
final GlobalCommand command;
@@ -769,14 +763,16 @@ public abstract class InMemoryCommandStore implements
CommandStore
active = queue.poll();
while (active != null)
{
- try
- {
- active.run();
- }
- catch (Throwable t)
- {
- logger.error("Uncaught exception", t);
- }
+ this.unsafeRunIn(() -> {
+ try
+ {
+ active.run();
+ }
+ catch (Throwable t)
+ {
+ logger.error("Uncaught exception", t);
+ }
+ });
active = queue.poll();
}
}
@@ -789,6 +785,12 @@ public abstract class InMemoryCommandStore implements
CommandStore
maybeRun();
}
+ @Override
+ public boolean inStore()
+ {
+ return CommandStore.maybeCurrent() == this;
+ }
+
@Override
public AsyncChain<Void> execute(PreLoadContext context, Consumer<?
super SafeCommandStore> consumer)
{
@@ -837,44 +839,39 @@ public abstract class InMemoryCommandStore implements
CommandStore
public static class SingleThread extends InMemoryCommandStore
{
- private final AtomicReference<Thread> expectedThread = new
AtomicReference<>();
+ private Thread thread; // when run in the executor this will be
non-null, null implies not running in this store
private final ExecutorService executor;
public SingleThread(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder
rangesForEpochHolder)
{
- this(id, time, agent, store, progressLogFactory,
rangesForEpochHolder, Executors.newSingleThreadExecutor(r -> {
+ super(id, time, agent, store, progressLogFactory,
rangesForEpochHolder);
+ this.executor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName(CommandStore.class.getSimpleName() + '[' +
time.id() + ']');
return thread;
- }));
- }
-
- private SingleThread(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder
rangesForEpochHolder, ExecutorService executor)
- {
- super(id, time, agent, store, progressLogFactory,
rangesForEpochHolder);
- this.executor = executor;
- }
-
- public static CommandStore.Factory factory(ExecutorService executor)
- {
- return (id, time, agent, store, progressLogFactory,
rangesForEpoch) -> new SingleThread(id, time, agent, store, progressLogFactory,
rangesForEpoch, executor);
+ });
+ // "this" is leaked before constructor is completed, but since all
fields are "final" and set before "this"
+ // is leaked, then visibility should not be an issue.
+ executor.execute(() -> thread = Thread.currentThread());
+ executor.execute(() -> CommandStore.register(this));
}
void assertThread()
{
Thread current = Thread.currentThread();
- Thread expected;
- while (true)
- {
- expected = expectedThread.get();
- if (expected != null)
- break;
- expectedThread.compareAndSet(null, Thread.currentThread());
- }
+ Thread expected = thread;
+ if (expected == null)
+ throw new IllegalStateException(String.format("Command store
called from wrong thread; unexpected %s", current));
if (expected != current)
throw new IllegalStateException(String.format("Command store
called from the wrong thread. Expected %s, got %s", expected, current));
}
+ @Override
+ public boolean inStore()
+ {
+ return thread == Thread.currentThread();
+ }
+
@Override
public AsyncChain<Void> execute(PreLoadContext context, Consumer<?
super SafeCommandStore> consumer)
{
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index 163318a0..5fe87a56 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -23,35 +23,36 @@ import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.local.CommandStore;
+import accord.utils.RandomSource;
public class InMemoryCommandStores
{
public static class Synchronized extends CommandStores
{
- public Synchronized(NodeTimeService time, Agent agent, DataStore
store, ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
+ public Synchronized(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
InMemoryCommandStore.Synchronized::new);
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, InMemoryCommandStore.Synchronized::new);
}
}
public static class SingleThread extends CommandStores
{
- public SingleThread(NodeTimeService time, Agent agent, DataStore
store, ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
+ public SingleThread(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
InMemoryCommandStore.SingleThread::new);
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, InMemoryCommandStore.SingleThread::new);
}
- public SingleThread(NodeTimeService time, Agent agent, DataStore
store, ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, CommandStore.Factory shardFactory)
+ public SingleThread(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
shardFactory);
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, shardFactory);
}
}
public static class Debug extends InMemoryCommandStores.SingleThread
{
- public Debug(NodeTimeService time, Agent agent, DataStore store,
ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
+ public Debug(NodeTimeService time, Agent agent, DataStore store,
RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
InMemoryCommandStore.Debug::new);
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, InMemoryCommandStore.Debug::new);
}
}
}
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java
b/accord-core/src/main/java/accord/local/AgentExecutor.java
similarity index 68%
copy from accord-core/src/main/java/accord/api/MessageSink.java
copy to accord-core/src/main/java/accord/local/AgentExecutor.java
index 38ac946d..e304cc21 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/local/AgentExecutor.java
@@ -16,17 +16,18 @@
* limitations under the License.
*/
-package accord.api;
+package accord.local;
-import accord.local.Node.Id;
-import accord.messages.Callback;
-import accord.messages.Reply;
-import accord.messages.ReplyContext;
-import accord.messages.Request;
+import accord.api.Agent;
+import accord.utils.async.AsyncExecutor;
-public interface MessageSink
+public interface AgentExecutor extends AsyncExecutor
{
- void send(Id to, Request request);
- void send(Id to, Request request, Callback callback);
- void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
+ Agent agent();
+
+ @Override
+ default void execute(Runnable command)
+ {
+ submit(command).begin(agent());
+ }
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 479f5817..d84794ab 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -23,17 +23,19 @@ import accord.api.ProgressLog;
import accord.api.DataStore;
import accord.local.CommandStores.RangesForEpochHolder;
import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncExecutor;
+import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
+import javax.annotation.Nullable;
+
/**
* Single threaded internal shard of accord transaction metadata
*/
-public interface CommandStore extends AsyncExecutor
+public abstract class CommandStore implements AgentExecutor
{
- interface Factory
+ public interface Factory
{
CommandStore create(int id,
NodeTimeService time,
@@ -43,16 +45,113 @@ public interface CommandStore extends AsyncExecutor
RangesForEpochHolder rangesForEpoch);
}
- int id();
- Agent agent();
- AsyncChain<Void> execute(PreLoadContext context, Consumer<? super
SafeCommandStore> consumer);
- <T> AsyncChain<T> submit(PreLoadContext context, Function<? super
SafeCommandStore, T> apply);
+ private static final ThreadLocal<CommandStore> CURRENT_STORE = new
ThreadLocal<>();
+
+ protected final int id;
+ protected final NodeTimeService time;
+ protected final Agent agent;
+ protected final DataStore store;
+ protected final ProgressLog progressLog;
+ protected final RangesForEpochHolder rangesForEpochHolder;
+
+ protected CommandStore(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder
rangesForEpochHolder)
+ {
+ this.id = id;
+ this.time = time;
+ this.agent = agent;
+ this.store = store;
+ this.progressLog = progressLogFactory.create(this);
+ this.rangesForEpochHolder = rangesForEpochHolder;
+ }
+
+ public int id()
+ {
+ return id;
+ }
+
+ @Override
+ public Agent agent()
+ {
+ return agent;
+ }
+
+ public abstract boolean inStore();
+
+ public abstract AsyncChain<Void> execute(PreLoadContext context,
Consumer<? super SafeCommandStore> consumer);
+
+ public abstract <T> AsyncChain<T> submit(PreLoadContext context,
Function<? super SafeCommandStore, T> apply);
+
+ public abstract void shutdown();
+
+ protected void unsafeRunIn(Runnable fn)
+ {
+ CommandStore prev = maybeCurrent();
+ CURRENT_STORE.set(this);
+ try
+ {
+ fn.run();
+ }
+ finally
+ {
+ if (prev == null) CURRENT_STORE.remove();
+ else CURRENT_STORE.set(prev);
+ }
+ }
+
+ protected <T> T unsafeRunIn(Callable<T> fn) throws Exception
+ {
+ CommandStore prev = maybeCurrent();
+ CURRENT_STORE.set(this);
+ try
+ {
+ return fn.call();
+ }
+ finally
+ {
+ if (prev == null) CURRENT_STORE.remove();
+ else CURRENT_STORE.set(prev);
+ }
+ }
@Override
- default void execute(Runnable command)
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "id=" + id +
+ '}';
+ }
+
+ @Nullable
+ public static CommandStore maybeCurrent()
+ {
+ return CURRENT_STORE.get();
+ }
+
+ public static CommandStore current()
{
- submit(command).begin(agent());
+ CommandStore cs = maybeCurrent();
+ if (cs == null)
+ throw new IllegalStateException("Attempted to access current
CommandStore, but not running in a CommandStore");
+ return cs;
}
- void shutdown();
+ protected static void register(CommandStore store)
+ {
+ if (!store.inStore())
+ throw new IllegalStateException("Unable to register a CommandStore
when not running in it; store " + store);
+ CURRENT_STORE.set(store);
+ }
+
+ public static void checkInStore()
+ {
+ CommandStore store = maybeCurrent();
+ if (store == null) throw new IllegalStateException("Expected to be
running in a CommandStore but is not");
+ }
+
+ public static void checkNotInStore()
+ {
+ CommandStore store = maybeCurrent();
+ if (store != null)
+ throw new IllegalStateException("Expected to not be running in a
CommandStore, but running in " + store);
+ }
}
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index 67b4f058..943bc075 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -26,6 +26,8 @@ import accord.utils.MapReduce;
import accord.utils.MapReduceConsume;
import com.google.common.annotations.VisibleForTesting;
+
+import accord.utils.RandomSource;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2ObjectHashMap;
import accord.utils.async.AsyncChain;
@@ -35,10 +37,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.IntStream;
import static accord.local.PreLoadContext.empty;
@@ -54,6 +55,7 @@ public abstract class CommandStores<S extends CommandStore>
CommandStores<?> create(NodeTimeService time,
Agent agent,
DataStore store,
+ RandomSource random,
ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory);
}
@@ -65,19 +67,21 @@ public abstract class CommandStores<S extends CommandStore>
private final DataStore store;
private final ProgressLog.Factory progressLogFactory;
private final CommandStore.Factory shardFactory;
+ private final RandomSource random;
- Supplier(NodeTimeService time, Agent agent, DataStore store,
ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+ Supplier(NodeTimeService time, Agent agent, DataStore store,
RandomSource random, ProgressLog.Factory progressLogFactory,
CommandStore.Factory shardFactory)
{
this.time = time;
this.agent = agent;
this.store = store;
+ this.random = random;
this.progressLogFactory = progressLogFactory;
this.shardFactory = shardFactory;
}
CommandStore create(int id, RangesForEpochHolder rangesForEpoch)
{
- return shardFactory.create(id, time, agent, store,
progressLogFactory, rangesForEpoch);
+ return shardFactory.create(id, time, agent, this.store,
progressLogFactory, rangesForEpoch);
}
}
@@ -230,10 +234,10 @@ public abstract class CommandStores<S extends
CommandStore>
this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY,
Topology.EMPTY);
}
- public CommandStores(NodeTimeService time, Agent agent, DataStore store,
ShardDistributor shardDistributor,
+ public CommandStores(NodeTimeService time, Agent agent, DataStore store,
RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory,
CommandStore.Factory shardFactory)
{
- this(new Supplier(time, agent, store, progressLogFactory,
shardFactory), shardDistributor);
+ this(new Supplier(time, agent, store, random, progressLogFactory,
shardFactory), shardDistributor);
}
public Topology local()
@@ -416,6 +420,30 @@ public abstract class CommandStores<S extends CommandStore>
shard.store.shutdown();
}
+ public CommandStore select(RoutingKey key)
+ {
+ return select(ranges -> ranges.contains(key));
+ }
+
+ private CommandStore select(Predicate<Ranges> fn)
+ {
+ ShardHolder[] shards = current.shards;
+ for (ShardHolder holder : shards)
+ {
+ if (fn.test(holder.ranges().currentRanges()))
+ return holder.store;
+ }
+ return any();
+ }
+
+ @VisibleForTesting
+ public CommandStore any()
+ {
+ ShardHolder[] shards = current.shards;
+ if (shards.length == 0) throw new IllegalStateException("Unable to get
CommandStore; non defined");
+ return shards[supplier.random.nextInt(shards.length)].store;
+ }
+
public CommandStore forId(int id)
{
Snapshot snapshot = current;
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index fbf59874..66c0bf6a 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -145,7 +145,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
this.agent = agent;
this.random = random;
this.scheduler = scheduler;
- this.commandStores = factory.create(this, agent, dataSupplier.get(),
shardDistributor, progressLogFactory.apply(this));
+ this.commandStores = factory.create(this, agent, dataSupplier.get(),
random.fork(), shardDistributor, progressLogFactory.apply(this));
configService.registerListener(this);
onTopologyUpdate(topology, false);
@@ -203,7 +203,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
else
{
configService.fetchTopologyForEpoch(epoch);
- topology.awaitEpoch(epoch).addCallback(runnable);
+ topology.awaitEpoch(epoch).addCallback(runnable).begin(agent());
}
}
@@ -311,7 +311,13 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
public void send(Shard shard, Request send, Callback callback)
{
- shard.nodes.forEach(node -> messageSink.send(node, send, callback));
+ send(shard, send, CommandStore.current(), callback);
+ }
+
+ private void send(Shard shard, Request send, AgentExecutor executor,
Callback callback)
+ {
+ checkStore(executor);
+ shard.nodes.forEach(node -> messageSink.send(node, send, executor,
callback));
}
private <T> void send(Shard shard, Request send, Set<Id> alreadyContacted)
@@ -334,18 +340,44 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
public <T> void send(Collection<Id> to, Request send, Callback<T> callback)
{
- to.forEach(dst -> send(dst, send, callback));
+ send(to, send, CommandStore.current(), callback);
+ }
+
+ public <T> void send(Collection<Id> to, Request send, AgentExecutor
executor, Callback<T> callback)
+ {
+ checkStore(executor);
+ to.forEach(dst -> messageSink.send(dst, send, executor, callback));
}
public <T> void send(Collection<Id> to, Function<Id, Request>
requestFactory, Callback<T> callback)
{
- to.forEach(dst -> send(dst, requestFactory.apply(dst), callback));
+ send(to, requestFactory, CommandStore.current(), callback);
+ }
+
+ public <T> void send(Collection<Id> to, Function<Id, Request>
requestFactory, AgentExecutor executor, Callback<T> callback)
+ {
+ checkStore(executor);
+ to.forEach(dst -> messageSink.send(dst, requestFactory.apply(dst),
executor, callback));
}
// send to a specific node
public <T> void send(Id to, Request send, Callback<T> callback)
{
- messageSink.send(to, send, callback);
+ send(to, send, CommandStore.current(), callback);
+ }
+
+ // send to a specific node
+ public <T> void send(Id to, Request send, AgentExecutor executor,
Callback<T> callback)
+ {
+ checkStore(executor);
+ messageSink.send(to, send, executor, callback);
+ }
+
+ private void checkStore(AgentExecutor executor)
+ {
+ CommandStore current = CommandStore.maybeCurrent();
+ if (current != null && current != executor)
+ throw new IllegalStateException(String.format("Used wrong
CommandStore %s; current is %s", executor, current));
}
// send to a specific node
@@ -495,7 +527,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
if (unknownEpoch > 0)
{
configService.fetchTopologyForEpoch(unknownEpoch);
- topology().awaitEpoch(unknownEpoch).addCallback(() ->
receive(request, from, replyContext));
+ topology().awaitEpoch(unknownEpoch).addCallback(() ->
receive(request, from, replyContext)).begin(agent());
return;
}
scheduler.now(() -> request.process(this, from, replyContext));
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 5b2f5eca..892cbfee 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -105,6 +105,13 @@ public interface SafeCommandStore
Command command = safeCommand.current();
for (CommandListener listener : command.listeners())
{
+ if (!safeCommand.current().listeners().contains(listener))
+ {
+ // notifyListeners is done for every mutation, which can cause
listeners to be different depending on
+ // where you are in the stack frame...
+ // To simplify listeners, double check that this wasn't
changed before calling again
+ continue;
+ }
PreLoadContext context =
listener.listenerPreLoadContext(command.txnId());
if (canExecuteWith(context))
{
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index d7558d35..6350793c 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -269,7 +269,7 @@ public class ReadData extends
AbstractEpochRequest<ReadData.ReadNack> implements
case RETURNED:
throw new IllegalStateException("ReadOk was sent, yet ack
called again");
case OBSOLETE:
- logger.debug("After the read completed for txn {}, the
result was marked obsolete", txnId);
+ logger.trace("After the read completed for txn {}, the
result was marked obsolete", txnId);
break;
case PENDING:
state = State.RETURNED;
diff --git a/accord-core/src/main/java/accord/messages/SafeCallback.java
b/accord-core/src/main/java/accord/messages/SafeCallback.java
new file mode 100644
index 00000000..a8a1416f
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/SafeCallback.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.messages;
+
+import java.util.Objects;
+
+import accord.coordinate.Timeout;
+import accord.local.AgentExecutor;
+import accord.local.Node;
+
+public class SafeCallback<T extends Reply>
+{
+ private final AgentExecutor executor;
+ private final Callback<T> callback;
+
+ public SafeCallback(AgentExecutor executor, Callback<T> callback)
+ {
+ this.executor = Objects.requireNonNull(executor, "executor");
+ this.callback = Objects.requireNonNull(callback, "callback");
+ }
+
+ public void success(Node.Id src, T reply)
+ {
+ safeCall(src, reply, Callback::onSuccess);
+ }
+
+ public void slowResponse(Node.Id src)
+ {
+ safeCall(src, null, (callback, id, ignore) ->
callback.onSlowResponse(id));
+ }
+
+ public void failure(Node.Id to, Throwable t)
+ {
+ safeCall(to, t, Callback::onFailure);
+ }
+
+ public void timeout(Node.Id to)
+ {
+ failure(to, new Timeout(null, null));
+ }
+
+ private interface SafeCall<T, P>
+ {
+ void accept(Callback<T> callback, Node.Id id, P param) throws
Throwable;
+ }
+
+ private <P> void safeCall(Node.Id src, P param, SafeCall<T, P> call)
+ {
+ // TODO (low priority, correctness): if the executor is shutdown this
propgates the exception to the network stack
+ executor.execute(() -> {
+ try
+ {
+ call.accept(callback, src, param);
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ callback.onCallbackFailure(src, t);
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ executor.agent().onUncaughtException(t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public String toString()
+ {
+ return callback.toString();
+ }
+}
diff --git a/accord-core/src/main/java/accord/primitives/KeyDeps.java
b/accord-core/src/main/java/accord/primitives/KeyDeps.java
index f9887b3b..1ac6b65d 100644
--- a/accord-core/src/main/java/accord/primitives/KeyDeps.java
+++ b/accord-core/src/main/java/accord/primitives/KeyDeps.java
@@ -29,7 +29,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
-import static accord.primitives.Routables.Slice.Overlapping;
import static accord.utils.ArrayBuffers.*;
import static accord.utils.RelationMultiMap.*;
import static accord.utils.SortedArrays.Search.FAST;
diff --git a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
index 437ed2b9..0d53d8a1 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
@@ -22,7 +22,6 @@ import accord.api.RoutingKey;
import accord.utils.Invariants;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
-import static accord.primitives.Routables.Slice.Overlapping;
/**
* A slice of a Route that covers
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java
b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index 9fa74754..7a6a14c0 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -23,7 +23,6 @@ import javax.annotation.Nullable;
import accord.api.Query;
import accord.api.Read;
import accord.api.Update;
-import accord.utils.Invariants;
public interface PartialTxn extends Txn
{
diff --git a/accord-core/src/main/java/accord/primitives/RangeDeps.java
b/accord-core/src/main/java/accord/primitives/RangeDeps.java
index 22d49080..568420e8 100644
--- a/accord-core/src/main/java/accord/primitives/RangeDeps.java
+++ b/accord-core/src/main/java/accord/primitives/RangeDeps.java
@@ -35,8 +35,6 @@ import java.util.function.Predicate;
import static accord.utils.ArrayBuffers.*;
import static accord.utils.RelationMultiMap.*;
import static accord.utils.RelationMultiMap.remove;
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
import static accord.utils.SortedArrays.Search.CEIL;
/**
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java
b/accord-core/src/main/java/accord/primitives/Routables.java
index 62798219..74889f28 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -26,10 +26,6 @@ import accord.utils.IndexedRangeFoldToLong;
import accord.utils.SortedArrays;
import net.nicoulaj.compilecommand.annotations.Inline;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static accord.primitives.Routables.Slice.Overlapping;
import static accord.utils.SortedArrays.Search.FLOOR;
/**
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java
b/accord-core/src/main/java/accord/primitives/Txn.java
index f1d26558..7172cb6d 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -31,8 +31,6 @@ import accord.utils.async.AsyncChains;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import static accord.primitives.Routables.Slice.Overlapping;
-
public interface Txn
{
enum Kind
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 6ffbc27a..14817aca 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -22,6 +22,7 @@ import accord.api.ConfigurationService;
import accord.api.RoutingKey;
import accord.api.TopologySorter;
import accord.coordinate.tracking.QuorumTracker;
+import accord.local.CommandStore;
import accord.local.Node.Id;
import accord.messages.Request;
import accord.primitives.*;
@@ -259,9 +260,15 @@ public class TopologyManager implements
ConfigurationService.Listener
toComplete.trySuccess(null);
}
- public synchronized AsyncResult<Void> awaitEpoch(long epoch)
+ public AsyncChain<Void> awaitEpoch(long epoch)
{
- return epochs.awaitEpoch(epoch);
+ AsyncResult<Void> result;
+ synchronized (this)
+ {
+ result = epochs.awaitEpoch(epoch);
+ }
+ CommandStore current = CommandStore.maybeCurrent();
+ return current == null || result.isDone() ? result :
result.withExecutor(current);
}
@Override
@@ -399,7 +406,10 @@ public class TopologyManager implements
ConfigurationService.Listener
public Topology localForEpoch(long epoch)
{
- return epochs.get(epoch).local();
+ EpochState epochState = epochs.get(epoch);
+ if (epochState == null)
+ throw new IllegalStateException("Unknown epoch " + epoch);
+ return epochState.local();
}
public Ranges localRangesForEpoch(long epoch)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index fa88aed4..130731d4 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -32,11 +32,21 @@ public interface AsyncChain<V>
*/
<T> AsyncChain<T> map(Function<? super V, ? extends T> mapper);
+ default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper,
Executor executor)
+ {
+ return AsyncChains.map(this, mapper, executor);
+ }
+
/**
* Support {@link
com.google.common.util.concurrent.Futures#transform(ListenableFuture,
com.google.common.base.Function, Executor)} natively
*/
<T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>>
mapper);
+ default <T> AsyncChain<T> flatMap(Function<? super V, ? extends
AsyncChain<T>> mapper, Executor executor)
+ {
+ return AsyncChains.flatMap(this, mapper, executor);
+ }
+
default AsyncChain<Void> accept(Consumer<? super V> action)
{
return map(r -> {
@@ -45,6 +55,21 @@ public interface AsyncChain<V>
});
}
+ default AsyncChain<Void> accept(Consumer<? super V> action, Executor
executor)
+ {
+ return map(r -> {
+ action.accept(r);
+ return null;
+ }, executor);
+ }
+
+ default AsyncChain<V> withExecutor(Executor e)
+ {
+ // since a chain runs as a sequence of callbacks, by adding a callback
that moves to this executor any new actions
+ // will be run on that desired executor.
+ return map(a -> a, e);
+ }
+
/**
* Support {@link com.google.common.util.concurrent.Futures#addCallback}
natively
*/
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 06f3f33e..809ca1c7 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -331,6 +331,67 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
return new Immediate<>(failure);
}
+ public static <V, T> AsyncChain<T> map(AsyncChain<V> chain, Function<?
super V, ? extends T> mapper, Executor executor)
+ {
+ return chain.flatMap(v -> new Head<T>()
+ {
+ @Override
+ protected void start(BiConsumer<? super T, Throwable> callback)
+ {
+ try
+ {
+ executor.execute(() -> {
+ T value;
+ try
+ {
+ value = mapper.apply(v);
+ }
+ catch (Throwable t)
+ {
+ callback.accept(null, t);
+ return;
+ }
+ callback.accept(value, null);
+ });
+ }
+ catch (Throwable t)
+ {
+ // TODO (low priority, correctness): If the executor is
shutdown then the callback may run in an unexpected thread, which may not be
thread safe
+ callback.accept(null, t);
+ }
+ }
+ });
+ }
+
+ public static <V, T> AsyncChain<T> flatMap(AsyncChain<V> chain, Function<?
super V, ? extends AsyncChain<T>> mapper, Executor executor)
+ {
+ return chain.flatMap(v -> new Head<T>()
+ {
+ @Override
+ protected void start(BiConsumer<? super T, Throwable> callback)
+ {
+ try
+ {
+ executor.execute(() -> {
+ try
+ {
+ mapper.apply(v).addCallback(callback);
+ }
+ catch (Throwable t)
+ {
+ callback.accept(null, t);
+ }
+ });
+ }
+ catch (Throwable t)
+ {
+ // TODO (low priority, correctness): If the executor is
shutdown then the callback may run in an unexpected thread, which may not be
thread safe
+ callback.accept(null, t);
+ }
+ }
+ });
+ }
+
public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V>
callable)
{
return new Head<V>()
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 78269a79..c4feef07 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -49,6 +49,7 @@ import accord.impl.basic.PendingQueue;
import accord.impl.basic.PropagatingPendingQueue;
import accord.impl.basic.RandomDelayQueue.Factory;
import accord.impl.basic.SimulatedDelayedExecutorService;
+import accord.impl.list.ListAgent;
import accord.impl.list.ListQuery;
import accord.impl.list.ListRead;
import accord.impl.list.ListRequest;
@@ -199,7 +200,8 @@ public class BurnTest
{
List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
PendingQueue queue = new PropagatingPendingQueue(failures, new
Factory(random).get());
- SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, random.fork());
+ ListAgent agent = new ListAgent(30L, failures::add);
+ SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, agent, random.fork());
StrictSerializabilityVerifier strictSerializable = new
StrictSerializabilityVerifier(keyCount);
Function<CommandStore, AsyncExecutor> executor = ignore ->
globalExecutor;
@@ -276,7 +278,7 @@ public class BurnTest
try
{
Cluster.run(toArray(nodes, Id[]::new), () -> queue,
- responseSink, failures::add,
+ responseSink, globalExecutor,
() -> random.fork(),
() -> new AtomicLong()::incrementAndGet,
topologyFactory, () -> null);
@@ -356,7 +358,7 @@ public class BurnTest
catch (Throwable t)
{
logger.error("Exception running burn test for seed {}:", seed, t);
- throw t;
+ throw SimulationException.wrap(seed, t);
}
}
diff --git
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 2f86e935..3f718856 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -18,8 +18,8 @@
package accord.burn;
-import accord.api.MessageSink;
import accord.api.TestableConfigurationService;
+import accord.local.AgentExecutor;
import accord.utils.RandomSource;
import accord.local.Node;
import accord.messages.*;
@@ -42,7 +42,7 @@ public class BurnTestConfigurationService implements
TestableConfigurationServic
private static final Logger logger =
LoggerFactory.getLogger(BurnTestConfigurationService.class);
private final Node.Id node;
- private final MessageSink messageSink;
+ private final AgentExecutor executor;
private final Function<Node.Id, Node> lookup;
private final Supplier<RandomSource> randomSupplier;
private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
@@ -128,10 +128,10 @@ public class BurnTestConfigurationService implements
TestableConfigurationServic
}
}
- public BurnTestConfigurationService(Node.Id node, MessageSink messageSink,
Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id,
Node> lookup, TopologyUpdates topologyUpdates)
+ public BurnTestConfigurationService(Node.Id node, AgentExecutor executor,
Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id,
Node> lookup, TopologyUpdates topologyUpdates)
{
this.node = node;
- this.messageSink = messageSink;
+ this.executor = executor;
this.randomSupplier = randomSupplier;
this.lookup = lookup;
this.topologyUpdates = topologyUpdates;
@@ -218,10 +218,10 @@ public class BurnTestConfigurationService implements
TestableConfigurationServic
{
this.request = new FetchTopologyRequest(epoch);
this.candidates = new ArrayList<>();
- sendNext();
+ executor.execute(this::sendNext);
}
- synchronized void sendNext()
+ void sendNext()
{
if (candidates.isEmpty())
{
@@ -230,7 +230,7 @@ public class BurnTestConfigurationService implements
TestableConfigurationServic
}
int idx = randomSupplier.get().nextInt(candidates.size());
Node.Id node = candidates.remove(idx);
- messageSink.send(node, request, this);
+ originator().send(node, request, executor, this);
}
@Override
@@ -243,7 +243,7 @@ public class BurnTestConfigurationService implements
TestableConfigurationServic
}
@Override
- public synchronized void onFailure(Node.Id from, Throwable failure)
+ public void onFailure(Node.Id from, Throwable failure)
{
sendNext();
}
@@ -269,10 +269,15 @@ public class BurnTestConfigurationService implements
TestableConfigurationServic
{
epochs.acknowledge(epoch);
Topology topology = getTopologyForEpoch(epoch);
- Node originator = lookup.apply(node);
+ Node originator = originator();
topologyUpdates.syncEpoch(originator, epoch - 1, topology.nodes());
}
+ private Node originator()
+ {
+ return lookup.apply(node);
+ }
+
@Override
public synchronized void reportTopology(Topology topology)
{
diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java
b/accord-core/src/test/java/accord/burn/SimulationException.java
similarity index 54%
copy from accord-core/src/test/java/accord/impl/list/ListData.java
copy to accord-core/src/test/java/accord/burn/SimulationException.java
index cc582276..08ecae50 100644
--- a/accord-core/src/test/java/accord/impl/list/ListData.java
+++ b/accord-core/src/test/java/accord/burn/SimulationException.java
@@ -16,31 +16,29 @@
* limitations under the License.
*/
-package accord.impl.list;
+package accord.burn;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
+public class SimulationException extends RuntimeException
+{
+ public SimulationException(long seed, Throwable t)
+ {
+ super(createMsg(seed, null), t, true, false);
+ }
-import accord.api.Data;
-import accord.api.Key;
+ public SimulationException(long seed, String msg, Throwable t)
+ {
+ super(createMsg(seed, msg), t, true, false);
+ }
-public class ListData extends TreeMap<Key, int[]> implements Data
-{
- @Override
- public Data merge(Data data)
+ private static String createMsg(long seed, String msg)
{
- if (data != null)
- this.putAll(((ListData)data));
- return this;
+ return String.format("Failed on seed %d%s", seed, msg == null ? "" :
"; " + msg);
}
- @Override
- public String toString()
+ public static SimulationException wrap(long seed, Throwable t)
{
- return entrySet().stream()
- .map(e -> e.getKey() + "=" +
Arrays.toString(e.getValue()))
- .collect(Collectors.joining(", ", "{", "}"));
+ if (t instanceof SimulationException)
+ return (SimulationException) t;
+ return new SimulationException(seed, t);
}
}
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index ffea504c..e2e897ee 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -21,8 +21,9 @@ package accord.burn;
import accord.api.TestableConfigurationService;
import accord.impl.InMemoryCommandStore;
import accord.coordinate.FetchData;
-import accord.impl.InMemoryCommandStores;
+import accord.local.AgentExecutor;
import accord.local.Command;
+import accord.local.CommandStore;
import accord.local.Commands;
import accord.local.Node;
import accord.local.Status;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -53,7 +53,6 @@ import static accord.coordinate.Invalidate.invalidate;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.Status.*;
import static accord.local.Status.Known.*;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
public class TopologyUpdates
{
@@ -91,7 +90,7 @@ public class TopologyUpdates
if (!node.topology().hasEpoch(toEpoch))
{
node.configService().fetchTopologyForEpoch(toEpoch);
- node.topology().awaitEpoch(toEpoch).addCallback(() ->
process(node, onDone));
+ node.topology().awaitEpoch(toEpoch).addCallback(() ->
process(node, onDone)).begin(node.agent());
return;
}
@@ -142,19 +141,21 @@ public class TopologyUpdates
dieExceptionally(invalidate.addCallback(((unused,
failure) -> onDone.accept(failure == null))).beginAsResult());
}
return null;
- }).beginAsResult();
+ }, node.commandStores().any()).beginAsResult();
dieExceptionally(sync);
}
}
private final Set<Long> pendingTopologies = Sets.newConcurrentHashSet();
+ private final AgentExecutor executor;
public static <T> BiConsumer<T, Throwable> dieOnException()
{
return (result, throwable) -> {
if (throwable != null)
{
- logger.error("", throwable);
+ logger.error("Unexpected exception", throwable);
+ logger.error("", new Throwable("Shutting down test"));
System.exit(1);
}
};
@@ -166,10 +167,16 @@ public class TopologyUpdates
return stage;
}
+ public TopologyUpdates(AgentExecutor executor)
+ {
+ this.executor = executor;
+ }
+
public MessageTask notify(Node originator, Collection<Node.Id> cluster,
Topology update)
{
pendingTopologies.add(update.epoch());
- return MessageTask.begin(originator, cluster, "TopologyNotify:" +
update.epoch(), (node, from, onDone) -> {
+ CommandStore.checkNotInStore();
+ return MessageTask.begin(originator, cluster, executor,
"TopologyNotify:" + update.epoch(), (node, from, onDone) -> {
long nodeEpoch = node.topology().epoch();
if (nodeEpoch + 1 < update.epoch())
onDone.accept(false);
@@ -186,7 +193,7 @@ public class TopologyUpdates
return result;
}
- private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node
node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>>
recipients, long trgEpoch, boolean committedOnly)
+ private AsyncChain<Stream<MessageTask>> syncEpochCommands(Node node, long
srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients,
long trgEpoch, boolean committedOnly)
{
Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
Consumer<Command> commandConsumer = command ->
syncMessages.merge(command.txnId(), new CheckStatusOk(node, command),
CheckStatusOk::merge);
@@ -198,7 +205,7 @@ public class TopologyUpdates
return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
CommandSync sync = new CommandSync(e.getKey(), e.getValue(),
srcEpoch, trgEpoch);
- return MessageTask.of(node, recipients.apply(sync),
sync.toString(), sync::process);
+ return MessageTask.of(node, recipients.apply(sync), executor,
sync.toString(), sync::process);
}));
}
@@ -208,13 +215,14 @@ public class TopologyUpdates
/**
* Syncs all replicated commands. Overkill, but useful for confirming
issues in optimizedSync
*/
- private static AsyncChain<Stream<MessageTask>> thoroughSync(Node node,
long syncEpoch)
+ private AsyncChain<Stream<MessageTask>> thoroughSync(Node node, long
syncEpoch)
{
Topology syncTopology =
node.configService().getTopologyForEpoch(syncEpoch);
Topology localTopology = syncTopology.forNode(node.id());
Function<CommandSync, Collection<Node.Id>> allNodes = cmd ->
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
Ranges ranges = localTopology.ranges();
+
List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>();
for (long epoch=1; epoch<=syncEpoch; epoch++)
work.add(syncEpochCommands(node, epoch, ranges, allNodes,
syncEpoch, COMMITTED_ONLY));
@@ -224,7 +232,7 @@ public class TopologyUpdates
/**
* Syncs all newly replicated commands when nodes are gaining ranges and
the current epoch
*/
- private static AsyncChain<Stream<MessageTask>> optimizedSync(Node node,
long srcEpoch)
+ private AsyncChain<Stream<MessageTask>> optimizedSync(Node node, long
srcEpoch)
{
long trgEpoch = srcEpoch + 1;
Topology syncTopology =
node.configService().getTopologyForEpoch(srcEpoch);
@@ -264,7 +272,7 @@ public class TopologyUpdates
return AsyncChains.reduce(work, Stream.empty(), Stream::concat);
}
- private static AsyncChain<Void> sync(Node node, long syncEpoch)
+ private AsyncChain<Void> sync(Node node, long syncEpoch)
{
return optimizedSync(node, syncEpoch)
.flatMap(messageStream -> {
@@ -288,7 +296,7 @@ public class TopologyUpdates
public AsyncResult<Void> syncEpoch(Node originator, long epoch,
Collection<Node.Id> cluster)
{
AsyncResult<Void> result = dieExceptionally(sync(originator, epoch)
- .flatMap(v -> MessageTask.apply(originator, cluster,
"SyncComplete:" + epoch, (node, from, onDone) -> {
+ .flatMap(v ->
MessageTask.apply(originator, cluster, executor, "SyncComplete:" + epoch,
(node, from, onDone) -> {
node.onEpochSyncComplete(originator.id(), epoch);
onDone.accept(true);
})).beginAsResult());
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 0d6b43a9..021adf44 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -23,6 +23,7 @@ import accord.impl.mock.MockCluster;
import accord.api.Result;
import accord.impl.mock.MockStore;
import accord.primitives.*;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index 06d66817..a76caa1c 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -22,8 +22,6 @@ import accord.impl.mock.MockCluster;
import accord.impl.mock.MockConfigurationService;
import accord.local.Command;
import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.Status;
import accord.primitives.Range;
import accord.topology.Topology;
import accord.primitives.Keys;
diff --git
a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
index 35418b58..d037cd75 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
@@ -20,7 +20,6 @@ package accord.coordinate.tracking;
import accord.impl.TopologyUtils;
import accord.local.Node.Id;
-import accord.local.Node;
import accord.primitives.Ranges;
import accord.topology.Shard;
import accord.topology.Topologies;
diff --git
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
index 3736dd8f..fc7038fe 100644
---
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
+++
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -19,6 +19,10 @@
package accord.coordinate.tracking;
import accord.burn.TopologyUpdates;
+import accord.impl.TestAgent;
+import accord.impl.basic.RandomDelayQueue;
+import accord.impl.basic.SimulatedDelayedExecutorService;
+import accord.local.AgentExecutor;
import accord.utils.DefaultRandom;
import accord.utils.RandomSource;
import accord.impl.IntHashKey;
@@ -96,13 +100,14 @@ public abstract class TrackerReconciler<ST extends
ShardTracker, T extends Abstr
{
System.out.println("seed: " + seed);
RandomSource random = new DefaultRandom(seed);
- return topologies(random).map(topologies -> constructor.apply(random,
topologies))
+ SimulatedDelayedExecutorService executor = new
SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new
TestAgent(), random);
+ return topologies(random, executor).map(topologies ->
constructor.apply(random, topologies))
.collect(Collectors.toList());
}
// TODO (required, testing): generalise and parameterise topology
generation a bit more
// also, select a subset of the generated
topologies to correctly simulate topology consumption logic
- private static Stream<Topologies> topologies(RandomSource random)
+ private static Stream<Topologies> topologies(RandomSource random,
AgentExecutor executor)
{
TopologyFactory factory = new TopologyFactory(2 + random.nextInt(3),
IntHashKey.ranges(4 + random.nextInt(12)));
List<Id> nodes = cluster(factory.rf * (1 +
random.nextInt(factory.shardRanges.length - 1)));
@@ -117,7 +122,7 @@ public abstract class TrackerReconciler<ST extends
ShardTracker, T extends Abstr
Deque<Topology> topologies = new ArrayDeque<>();
topologies.add(topology);
- TopologyUpdates topologyUpdates = new TopologyUpdates();
+ TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
TopologyRandomizer configRandomizer = new TopologyRandomizer(() ->
random, topology, topologyUpdates, (id, top) -> {});
while (--count > 0)
{
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 95b00b9a..6d549d45 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -38,13 +38,13 @@ import accord.api.MessageSink;
import accord.burn.BurnTestConfigurationService;
import accord.burn.TopologyUpdates;
import accord.impl.*;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
-import accord.impl.list.ListAgent;
import accord.impl.list.ListStore;
import accord.local.ShardDistributor;
-import accord.messages.Callback;
+import accord.messages.SafeCallback;
import accord.messages.Reply;
import accord.messages.Request;
import accord.topology.TopologyRandomizer;
@@ -150,24 +150,12 @@ public class Cluster implements Scheduler
if (deliver.message instanceof Reply)
{
Reply reply = (Reply) deliver.message;
- Callback callback = reply.isFinal()
- ?
sinks.get(deliver.dst).callbacks.remove(deliver.replyId)
- :
sinks.get(deliver.dst).callbacks.get(deliver.replyId);
+ SafeCallback callback = reply.isFinal()
+ ?
sinks.get(deliver.dst).callbacks.remove(deliver.replyId)
+ :
sinks.get(deliver.dst).callbacks.get(deliver.replyId);
if (callback != null)
- {
- on.scheduler().now(() -> {
- try
- {
- callback.onSuccess(deliver.src, reply);
- }
- catch (Throwable t)
- {
- callback.onCallbackFailure(deliver.src, t);
- on.agent().onUncaughtException(t);
- }
- });
- }
+ callback.success(deliver.src, reply);
}
else on.receive((Request) deliver.message, deliver.src, deliver);
}
@@ -206,24 +194,25 @@ public class Cluster implements Scheduler
run.run();
}
- public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier,
Consumer<Packet> responseSink, Consumer<Throwable> onFailure,
Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplier,
TopologyFactory topologyFactory, Supplier<Packet> in)
+ public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier,
Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource>
randomSupplier, Supplier<LongSupplier> nowSupplier, TopologyFactory
topologyFactory, Supplier<Packet> in)
{
- TopologyUpdates topologyUpdates = new TopologyUpdates();
+
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> lookup = new LinkedHashMap<>();
- TopologyRandomizer configRandomizer = new
TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
try
{
Cluster sinks = new Cluster(queueSupplier, lookup::get,
responseSink);
+ TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
+ TopologyRandomizer configRandomizer = new
TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
for (Id node : nodes)
{
MessageSink messageSink = sinks.create(node,
randomSupplier.get());
- BurnTestConfigurationService configService = new
BurnTestConfigurationService(node, messageSink, randomSupplier, topology,
lookup::get, topologyUpdates);
+ BurnTestConfigurationService configService = new
BurnTestConfigurationService(node, executor, randomSupplier, topology,
lookup::get, topologyUpdates);
lookup.put(node, new Node(node, messageSink, configService,
nowSupplier.get(),
() -> new ListStore(node), new
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
- new ListAgent(30L, onFailure),
+ executor.agent(),
randomSupplier.get(), sinks,
SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new,
DelayedCommandStores.factory(sinks.pending, randomSupplier.get())));
+ SimpleProgressLog::new,
DelayedCommandStores.factory(sinks.pending)));
}
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 8e84f8e4..72153c24 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -18,26 +18,106 @@
package accord.impl.basic;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.impl.InMemoryCommandStore;
import accord.impl.InMemoryCommandStores;
+import accord.impl.basic.TaskExecutorService.Task;
+import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
{
- private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, SimulatedDelayedExecutorService executorService)
+ private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService
executorService)
+ {
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, DelayedCommandStore.factory(executorService));
+ }
+
+ public static CommandStores.Factory factory(PendingQueue pending)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
InMemoryCommandStore.SingleThread.factory(executorService));
+ return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
+ new DelayedCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, new
SimulatedDelayedExecutorService(pending, agent, random));
}
- public static CommandStores.Factory factory(PendingQueue pending,
RandomSource random)
+ public static class DelayedCommandStore extends InMemoryCommandStore
{
- SimulatedDelayedExecutorService executorService = new
SimulatedDelayedExecutorService(pending, random);
- return (time, agent, store, shardDistributor, progressLogFactory) ->
new DelayedCommandStores(time, agent, store, shardDistributor,
progressLogFactory, executorService);
+ private final SimulatedDelayedExecutorService executor;
+ private final Queue<Task<?>> pending = new LinkedList<>();
+
+ public DelayedCommandStore(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder
rangesForEpochHolder, SimulatedDelayedExecutorService executor)
+ {
+ super(id, time, agent, store, progressLogFactory,
rangesForEpochHolder);
+ this.executor = executor;
+ }
+
+ private static CommandStore.Factory
factory(SimulatedDelayedExecutorService executor)
+ {
+ return (id, time, agent, store, progressLogFactory,
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store,
progressLogFactory, rangesForEpoch, executor);
+ }
+
+ @Override
+ public boolean inStore()
+ {
+ return CommandStore.maybeCurrent() == this;
+ }
+
+ @Override
+ public AsyncChain<Void> execute(PreLoadContext context, Consumer<?
super SafeCommandStore> consumer)
+ {
+ return submit(context, i -> { consumer.accept(i); return null; });
+ }
+
+ @Override
+ public <T> AsyncChain<T> submit(PreLoadContext context, Function<?
super SafeCommandStore, T> function)
+ {
+ return submit(() -> executeInContext(this, context, function));
+ }
+
+ @Override
+ public <T> AsyncChain<T> submit(Callable<T> fn)
+ {
+ Task<T> task = new Task<>(() -> this.unsafeRunIn(fn));
+ boolean wasEmpty = pending.isEmpty();
+ pending.add(task);
+ if (wasEmpty)
+ runNextTask();
+ return task;
+ }
+
+ private void runNextTask()
+ {
+ Task<?> next = pending.peek();
+ if (next == null)
+ return;
+
+ next.addCallback(agent()); // used to track unexpected exceptions
and notify simulations
+ next.addCallback(this::afterExecution);
+ executor.execute(next);
+ }
+
+ private void afterExecution()
+ {
+ pending.poll();
+ runNextTask();
+ }
+
+ @Override
+ public void shutdown()
+ {
+
+ }
}
}
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index bad4ed09..d0d70402 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -23,8 +23,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import accord.local.AgentExecutor;
+import accord.messages.SafeCallback;
import accord.utils.RandomSource;
-import accord.coordinate.Timeout;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
@@ -43,7 +44,7 @@ public class NodeSink implements MessageSink
final RandomSource random;
int nextMessageId = 0;
- Map<Long, Callback> callbacks = new LinkedHashMap<>();
+ Map<Long, SafeCallback> callbacks = new LinkedHashMap<>();
public NodeSink(Id self, Function<Id, Node> lookup, Cluster parent,
RandomSource random)
{
@@ -60,39 +61,19 @@ public class NodeSink implements MessageSink
}
@Override
- public void send(Id to, Request send, Callback callback)
+ public void send(Id to, Request send, AgentExecutor executor, Callback
callback)
{
long messageId = nextMessageId++;
- callbacks.put(messageId, callback);
+ SafeCallback sc = new SafeCallback(executor, callback);
+ callbacks.put(messageId, sc);
parent.add(self, to, messageId, send);
parent.pending.add((PendingRunnable) () -> {
- if (callback == callbacks.get(messageId))
- {
- try
- {
- callback.onSlowResponse(to);
- }
- catch (Throwable t)
- {
- callback.onCallbackFailure(to, t);
- lookup.apply(self).agent().onUncaughtException(t);
- }
-
- }
+ if (sc == callbacks.get(messageId))
+ sc.slowResponse(to);
}, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
parent.pending.add((PendingRunnable) () -> {
- if (callback == callbacks.remove(messageId))
- {
- try
- {
- callback.onFailure(to, new Timeout(null, null));
- }
- catch (Throwable t)
- {
- callback.onCallbackFailure(to, t);
- lookup.apply(self).agent().onUncaughtException(t);
- }
- }
+ if (sc == callbacks.remove(messageId))
+ sc.timeout(to);
}, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
}
diff --git
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 850f717d..e21cc779 100644
---
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -18,9 +18,9 @@
package accord.impl.basic;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import accord.api.Agent;
import accord.burn.random.FrequentLargeRange;
import accord.burn.random.RandomLong;
import accord.burn.random.RandomWalkRange;
@@ -29,12 +29,14 @@ import accord.utils.RandomSource;
public class SimulatedDelayedExecutorService extends TaskExecutorService
{
private final PendingQueue pending;
+ private final Agent agent;
private final RandomSource random;
private final RandomLong jitterInNano;
- public SimulatedDelayedExecutorService(PendingQueue pending, RandomSource
random)
+ public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent,
RandomSource random)
{
this.pending = pending;
+ this.agent = agent;
this.random = random;
// this is different from Apache Cassandra Simulator as this is
computed differently for each executor
// rather than being a global config
@@ -60,10 +62,9 @@ public class SimulatedDelayedExecutorService extends
TaskExecutorService
pending.add(task, jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
}
- public <T> Task<T> submit(Callable<T> fn, long delay, TimeUnit unit)
+ @Override
+ public Agent agent()
{
- Task<T> task = newTaskFor(fn);
- pending.add(task, jitterInNano.getLong(random) + unit.toNanos(delay),
TimeUnit.NANOSECONDS);
- return task;
+ return agent;
}
}
\ No newline at end of file
diff --git
a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
index 2ab766bb..c68bdff8 100644
--- a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
@@ -27,10 +27,10 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import accord.utils.async.AsyncExecutor;
+import accord.local.AgentExecutor;
import accord.utils.async.AsyncResults;
-public abstract class TaskExecutorService extends AbstractExecutorService
implements AsyncExecutor
+public abstract class TaskExecutorService extends AbstractExecutorService
implements AgentExecutor
{
public static class Task<T> extends AsyncResults.SettableResult<T>
implements Pending, RunnableFuture<T>
{
diff --git
a/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
b/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
deleted file mode 100644
index 45d55c30..00000000
--- a/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.impl.basic;
-
-import accord.utils.RandomSource;
-
-import java.util.PriorityQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-public class UniformRandomQueue<T> implements PendingQueue
-{
- public static class Factory implements Supplier<PendingQueue>
- {
- final RandomSource seeds;
-
- public Factory(RandomSource seeds)
- {
- this.seeds = seeds;
- }
-
- @Override
- public PendingQueue get()
- {
- return new UniformRandomQueue<>(seeds.fork());
- }
- }
-
- static class Item implements Comparable<Item>
- {
- final double priority;
- final Pending value;
-
- Item(double priority, Pending value)
- {
- this.priority = priority;
- this.value = value;
- }
-
- @Override
- public int compareTo(Item that)
- {
- return Double.compare(this.priority, that.priority);
- }
- }
-
- final PriorityQueue<Item> queue = new PriorityQueue<>();
- final RandomSource random;
-
- public UniformRandomQueue(RandomSource random)
- {
- this.random = random;
- }
-
- @Override
- public int size()
- {
- return queue.size();
- }
-
- @Override
- public void add(Pending item)
- {
- queue.add(new Item(random.nextDouble(), item));
- }
-
- @Override
- public void add(Pending item, long delay, TimeUnit units)
- {
- queue.add(new Item(random.nextDouble(), item));
- }
-
- @Override
- public Pending poll()
- {
- return unwrap(queue.poll());
- }
-
- private static Pending unwrap(Item e)
- {
- return e == null ? null : e.value;
- }
-}
diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java
b/accord-core/src/test/java/accord/impl/list/ListData.java
index cc582276..26874ac7 100644
--- a/accord-core/src/test/java/accord/impl/list/ListData.java
+++ b/accord-core/src/test/java/accord/impl/list/ListData.java
@@ -19,7 +19,6 @@
package accord.impl.list;
import java.util.Arrays;
-import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index f2a9883c..8fbe8941 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -111,21 +111,23 @@ public class ListRequest implements Request
((Cluster)node.scheduler()).onDone(() -> {
RoutingKey homeKey = ((CoordinateFailed) fail).homeKey();
TxnId txnId = ((CoordinateFailed) fail).txnId();
- CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f)
-> {
- if (f != null)
- return;
- switch (s)
- {
- case Invalidated:
- node.reply(client, replyContext, new
ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, null,
null));
- break;
- case Lost:
- node.reply(client, replyContext, new
ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new
int[0][], null));
- break;
- case Neither:
- // currently caught elsewhere in response
tracking, but might help to throw an exception here
- }
- });
+ node.commandStores()
+ .select(homeKey)
+ .execute(() -> CheckOnResult.checkOnResult(node,
txnId, homeKey, (s, f) -> {
+ if (f != null)
+ return;
+ switch (s)
+ {
+ case Invalidated:
+ node.reply(client, replyContext, new
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null,
null));
+ break;
+ case Lost:
+ node.reply(client, replyContext, new
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new
int[0][], null));
+ break;
+ case Neither:
+ // currently caught elsewhere in response
tracking, but might help to throw an exception here
+ }
+ }));
});
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index cf79d139..b2986e58 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -20,7 +20,6 @@ package accord.impl.list;
import java.util.*;
import java.util.AbstractMap.SimpleEntry;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import accord.api.Key;
@@ -28,7 +27,6 @@ import accord.local.Node;
import accord.api.DataStore;
import accord.primitives.Range;
import accord.primitives.RoutableKey;
-import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.utils.Timestamped;
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 2a888642..d98c2dde 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -20,11 +20,12 @@ package accord.impl.mock;
import accord.NetworkFilter;
import accord.api.MessageSink;
-import accord.coordinate.Timeout;
import accord.impl.*;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.ShardDistributor;
+import accord.messages.SafeCallback;
import accord.primitives.Ranges;
import accord.utils.DefaultRandom;
import accord.utils.EpochFunction;
@@ -63,7 +64,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
public NetworkFilter networkFilter = new NetworkFilter();
private long nextMessageId = 0;
- Map<Long, Callback> callbacks = new ConcurrentHashMap<>();
+ Map<Long, SafeCallback> callbacks = new ConcurrentHashMap<>();
private final EpochFunction<MockConfigurationService> onFetchTopology;
private MockCluster(Builder builder)
@@ -140,7 +141,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
}
@Override
- public void send(Id from, Id to, Request request, Callback callback)
+ public void send(Id from, Id to, Request request, AgentExecutor executor,
Callback callback)
{
Node node = nodes.get(to);
if (node == null)
@@ -153,7 +154,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
{
// TODO (desired, testing): more flexible timeouts
if (callback != null)
- callback.onFailure(to, new Timeout(null, null));
+ new SafeCallback(executor, callback).timeout(to);
logger.info("discarding filtered message from {} to {}: {}", from,
to, request);
return;
}
@@ -161,10 +162,11 @@ public class MockCluster implements Network,
AutoCloseable, Iterable<Node>
long messageId = nextMessageId();
if (callback != null)
{
- callbacks.put(messageId, callback);
+ SafeCallback sc = new SafeCallback(executor, callback);
+ callbacks.put(messageId, sc);
node.scheduler().once(() -> {
- if (callbacks.remove(messageId, callback))
- callback.onFailure(to, new Timeout(null, null));
+ if (callbacks.remove(messageId, sc))
+ sc.timeout(to);
}, 2L, TimeUnit.SECONDS);
}
@@ -182,17 +184,17 @@ public class MockCluster implements Network,
AutoCloseable, Iterable<Node>
return;
}
- Callback callback = callbacks.remove(replyingToMessage);
+ SafeCallback sc = callbacks.remove(replyingToMessage);
if (networkFilter.shouldDiscard(from, replyingToNode, reply))
{
logger.info("discarding filtered reply from {} to {}: {}", from,
reply, reply);
- if (callback != null)
- callback.onFailure(from, new Timeout(null, null));
+ if (sc != null)
+ sc.timeout(from);
return;
}
- if (callback == null)
+ if (sc == null)
{
logger.warn("Callback not found for reply from {} to {}: {}
(msgid: {})", from, replyingToNode, reply, replyingToMessage);
return;
@@ -202,11 +204,11 @@ public class MockCluster implements Network,
AutoCloseable, Iterable<Node>
node.scheduler().now(() -> {
try
{
- callback.onSuccess(from, reply);
+ sc.success(from, reply);
}
catch (Throwable t)
{
- callback.onCallbackFailure(from, t);
+ sc.failure(from, t);
}
});
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java
b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index 66e91e18..0a61b415 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -28,10 +28,8 @@ import accord.api.Write;
import accord.local.SafeCommandStore;
import accord.primitives.*;
import accord.primitives.Ranges;
-import accord.primitives.Keys;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
-import accord.primitives.*;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
diff --git a/accord-core/src/test/java/accord/impl/mock/Network.java
b/accord-core/src/test/java/accord/impl/mock/Network.java
index 4f64acc5..afa5e541 100644
--- a/accord-core/src/test/java/accord/impl/mock/Network.java
+++ b/accord-core/src/test/java/accord/impl/mock/Network.java
@@ -18,6 +18,7 @@
package accord.impl.mock;
+import accord.local.AgentExecutor;
import accord.local.Node.Id;
import accord.messages.Callback;
import accord.messages.Reply;
@@ -46,13 +47,13 @@ public interface Network
return new MessageId(messageId);
}
- void send(Id from, Id to, Request request, Callback callback);
+ void send(Id from, Id to, Request request, AgentExecutor executor,
Callback callback);
void reply(Id from, Id replyingToNode, long replyingToMessage, Reply
reply);
Network BLACK_HOLE = new Network()
{
@Override
- public void send(Id from, Id to, Request request, Callback callback)
+ public void send(Id from, Id to, Request request, AgentExecutor
executor, Callback callback)
{
// TODO (easy, testing): log
}
diff --git
a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
b/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
index c6cfb90e..ef00df85 100644
--- a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
@@ -18,6 +18,7 @@
package accord.impl.mock;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.messages.Callback;
import accord.messages.Reply;
@@ -62,10 +63,10 @@ public class RecordingMessageSink extends SimpleMessageSink
}
@Override
- public void send(Node.Id to, Request request, Callback callback)
+ public void send(Node.Id to, Request request, AgentExecutor executor,
Callback callback)
{
requests.add(new Envelope<>(to, request, callback));
- super.send(to, request, callback);
+ super.send(to, request, executor, callback);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
index cdf9dc3d..e33314d2 100644
--- a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
@@ -18,6 +18,7 @@
package accord.impl.mock;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.api.MessageSink;
import accord.messages.Callback;
@@ -39,13 +40,13 @@ public class SimpleMessageSink implements MessageSink
@Override
public void send(Node.Id to, Request request)
{
- network.send(node, to, request, null);
+ network.send(node, to, request, null, null);
}
@Override
- public void send(Node.Id to, Request request, Callback callback)
+ public void send(Node.Id to, Request request, AgentExecutor executor,
Callback callback)
{
- network.send(node, to, request, callback);
+ network.send(node, to, request, executor, callback);
}
@Override
diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java
b/accord-core/src/test/java/accord/utils/MessageTask.java
index d868049c..d9084d10 100644
--- a/accord-core/src/test/java/accord/utils/MessageTask.java
+++ b/accord-core/src/test/java/accord/utils/MessageTask.java
@@ -18,6 +18,7 @@
package accord.utils;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.messages.*;
import accord.utils.async.AsyncResults;
@@ -70,6 +71,7 @@ public class MessageTask extends
AsyncResults.SettableResult<Void> implements Ru
private final List<Node.Id> recipients;
private final String desc;
private final Request request;
+ private final AgentExecutor executor;
private final RetryingCallback callback;
private class TaskRequest implements Request
@@ -117,7 +119,7 @@ public class MessageTask extends
AsyncResults.SettableResult<Void> implements Ru
Invariants.checkArgument(reply == SUCCESS || reply == FAILURE);
if (reply == FAILURE)
{
- originator.send(from, request, this);
+ originator.send(from, request, executor, this);
return;
}
@@ -132,7 +134,7 @@ public class MessageTask extends
AsyncResults.SettableResult<Void> implements Ru
@Override
public void onFailure(Node.Id from, Throwable failure)
{
- originator.send(from, request, this);
+ originator.send(from, request, executor, this);
}
@Override
@@ -144,7 +146,7 @@ public class MessageTask extends
AsyncResults.SettableResult<Void> implements Ru
private MessageTask(Node originator,
List<Node.Id> recipients,
- String desc, NodeProcess process)
+ AgentExecutor executor, String desc, NodeProcess
process)
{
Invariants.checkArgument(!recipients.isEmpty());
this.originator = originator;
@@ -152,31 +154,30 @@ public class MessageTask extends
AsyncResults.SettableResult<Void> implements Ru
this.desc = desc;
this.request = new TaskRequest(process, desc);
this.callback = new RetryingCallback(recipients);
+ this.executor = executor;
}
- public static MessageTask of(Node originator, Collection<Node.Id>
recipients, String desc, NodeProcess process)
+ private static MessageTask of(Node originator, Collection<Node.Id>
recipients, AgentExecutor executor, String desc, NodeProcess process)
{
- return new MessageTask(originator, new ArrayList<>(recipients), desc,
process);
+ return new MessageTask(originator, new ArrayList<>(recipients),
executor, desc, process);
}
- public static MessageTask begin(Node originator, Collection<Node.Id>
recipients, String desc, NodeProcess process)
+ public static MessageTask begin(Node originator, Collection<Node.Id>
recipients, AgentExecutor executor, String desc, NodeProcess process)
{
- MessageTask task = of(originator, recipients, desc, process);
+ MessageTask task = of(originator, recipients, executor, desc, process);
task.run();
return task;
}
- public static MessageTask of(Node originator, Collection<Node.Id>
recipients, String desc, BiConsumer<Node, Consumer<Boolean>> consumer)
+ public static MessageTask of(Node originator, Collection<Node.Id>
recipients, AgentExecutor executor, String desc, BiConsumer<Node,
Consumer<Boolean>> consumer)
{
- NodeProcess process = (node, from, onDone) -> {
- consumer.accept(node, onDone);
- };
- return of(originator, recipients, desc, process);
+ NodeProcess process = (node, from, onDone) -> consumer.accept(node,
onDone);
+ return of(originator, recipients, executor, desc, process);
}
- public static MessageTask apply(Node originator, Collection<Node.Id>
recipients, String desc, NodeProcess process)
+ public static MessageTask apply(Node originator, Collection<Node.Id>
recipients, AgentExecutor executor, String desc, NodeProcess process)
{
- MessageTask task = of(originator, recipients, desc, process);
+ MessageTask task = of(originator, recipients, executor, desc, process);
task.run();
return task;
}
@@ -184,7 +185,7 @@ public class MessageTask extends
AsyncResults.SettableResult<Void> implements Ru
@Override
public void run()
{
- originator.send(recipients, request, callback);
+ originator.send(recipients, request, executor, callback);
}
@Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 74de7eb1..31799dff 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -39,15 +39,16 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
-import accord.coordinate.Timeout;
import accord.impl.SizeOfIntersectionSorter;
import accord.impl.SimpleProgressLog;
import accord.impl.InMemoryCommandStores;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
import accord.local.ShardDistributor;
import accord.messages.Callback;
+import accord.messages.SafeCallback;
import accord.messages.Reply;
import accord.messages.ReplyContext;
import accord.messages.Request;
@@ -79,7 +80,7 @@ public class Cluster implements Scheduler
final RandomSource random;
int nextMessageId = 0;
- Map<Long, Callback> callbacks = new LinkedHashMap<>();
+ Map<Long, SafeCallback> callbacks = new LinkedHashMap<>();
public InstanceSink(Id self, Function<Id, Node> lookup, Cluster
parent, RandomSource random)
{
@@ -96,14 +97,15 @@ public class Cluster implements Scheduler
}
@Override
- public void send(Id to, Request send, Callback callback)
+ public void send(Id to, Request send, AgentExecutor executor, Callback
callback)
{
long messageId = nextMessageId++;
- callbacks.put(messageId, callback);
+ SafeCallback sc = new SafeCallback(executor, callback);
+ callbacks.put(messageId, sc);
parent.add(self, to, messageId, send);
parent.pending.add((Runnable)() -> {
- if (callback == callbacks.remove(messageId))
- callback.onFailure(to, new Timeout(null, null));
+ if (sc == callbacks.remove(messageId))
+ sc.timeout(to);
}, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
}
@@ -201,18 +203,9 @@ public class Cluster implements Scheduler
if (deliver.body.in_reply_to > Body.SENTINEL_MSG_ID ||
body instanceof Reply)
{
Reply reply = (Reply) body;
- Callback callback =
sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to);
+ SafeCallback callback =
sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to);
if (callback != null)
- on.scheduler().now(() -> {
- try
- {
- callback.onSuccess(deliver.src, reply);
- }
- catch (Throwable t)
- {
- callback.onCallbackFailure(deliver.src, t);
- }
- });
+ callback.success(deliver.src, reply);
}
else on.receive((Request) body, deliver.src, deliver);
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index 63b757c3..b32a6a7c 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -477,12 +477,15 @@ public class Json
public void write(JsonWriter out, ReadOk value) throws IOException
{
out.beginArray();
- for (Map.Entry<Key, Value> e :
((MaelstromData)value.data).entrySet())
+ if (value.data != null)
{
- out.beginArray();
- ((MaelstromKey)e.getKey()).datum.write(out);
- e.getValue().write(out);
- out.endArray();
+ for (Map.Entry<Key, Value> e :
((MaelstromData)value.data).entrySet())
+ {
+ out.beginArray();
+ ((MaelstromKey)e.getKey()).datum.write(out);
+ e.getValue().write(out);
+ out.endArray();
+ }
}
out.endArray();
}
diff --git
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
index 34939ddc..418deff4 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
@@ -28,7 +28,6 @@ import accord.api.Data;
import accord.api.Key;
import accord.api.Query;
import accord.api.Result;
-import accord.primitives.Keys;
import accord.primitives.TxnId;
public class MaelstromQuery implements Query
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 955645c8..1df8f9dc 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -34,6 +34,7 @@ import accord.coordinate.Timeout;
import accord.impl.SimpleProgressLog;
import accord.impl.InMemoryCommandStores;
import accord.impl.SizeOfIntersectionSorter;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
@@ -112,8 +113,9 @@ public class Main
}
@Override
- public void send(Id to, Request send, Callback callback)
+ public void send(Id to, Request send, AgentExecutor ignored, Callback
callback)
{
+ // Executor is ignored due to the fact callbacks are applied in a
single thread already
long messageId = nextMessageId.incrementAndGet();
callbacks.put(messageId, new CallbackInfo(callback, to,
nowSupplier.getAsLong() + 1000L));
send(new Packet(self, to, messageId, send));
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle
b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index 5817cc9c..34b89a46 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -18,6 +18,7 @@
plugins {
id 'java'
+ id 'checkstyle'
}
group accord_group
@@ -33,6 +34,11 @@ compileJava {
dependsOn(':rat')
}
+checkstyle {
+ showViolations = true
+ configDirectory = file("${rootProject.projectDir}/.build/checkstyle")
+}
+
test {
useJUnitPlatform()
// Use max(cpu/2, 1) workers to run tests
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]