This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 79fc1ebf CEP-15: Minimize transaction state kept in system tables (#62)
79fc1ebf is described below
commit 79fc1ebf7db6aa5e616dbef1bc61b616fea3c2c6
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Mon Sep 25 16:07:44 2023 +0100
CEP-15: Minimize transaction state kept in system tables (#62)
* CEP-15: Minimize transaction state kept in system tables
patch by Aleksey Yeschenko; reviewed by Ariel Weisberg for CASSANDRA-18573
---
.../src/main/java/accord/api/ProgressLog.java | 3 +-
accord-core/src/main/java/accord/api/Read.java | 1 +
accord-core/src/main/java/accord/api/Update.java | 1 +
.../src/main/java/accord/coordinate/FetchData.java | 255 +------------
.../java/accord/coordinate/RecoverWithRoute.java | 8 +-
.../src/main/java/accord/local/Command.java | 124 ++++++-
accord-core/src/main/java/accord/local/Node.java | 11 +-
.../main/java/accord/local/SerializerSupport.java | 413 +++++++++++++++++++++
accord-core/src/main/java/accord/local/Status.java | 9 +-
.../src/main/java/accord/messages/Apply.java | 37 +-
.../src/main/java/accord/messages/Commit.java | 23 +-
.../main/java/accord/messages/InformDurable.java | 11 -
.../{api/Read.java => messages/LocalMessage.java} | 24 +-
.../src/main/java/accord/messages/MessageType.java | 105 ++++--
.../src/main/java/accord/messages/Propagate.java | 371 ++++++++++++++++++
.../src/main/java/accord/primitives/Deps.java | 2 +-
.../main/java/accord/primitives/PartialDeps.java | 9 +-
.../main/java/accord/primitives/PartialTxn.java | 18 +-
.../src/main/java/accord/primitives/Writes.java | 7 +-
accord-core/src/test/java/accord/Utils.java | 2 +
.../src/test/java/accord/impl/basic/Cluster.java | 3 +-
.../test/java/accord/impl/mock/MockCluster.java | 2 +
.../java/accord/local/ImmutableCommandTest.java | 2 +-
.../src/main/java/accord/maelstrom/Cluster.java | 3 +-
.../src/main/java/accord/maelstrom/Main.java | 3 +-
25 files changed, 1079 insertions(+), 368 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java
b/accord-core/src/main/java/accord/api/ProgressLog.java
index 4ede480f..47244c17 100644
--- a/accord-core/src/main/java/accord/api/ProgressLog.java
+++ b/accord-core/src/main/java/accord/api/ProgressLog.java
@@ -142,8 +142,7 @@ public interface ProgressLog
void executed(Command command, ProgressShard shard);
/**
- * The transaction's outcome has been durably recorded (but not
necessarily applied) at a quorum of all shards,
- * including at least those node's ids that are provided.
+ * The transaction's outcome has been durably recorded (but not
necessarily applied) at a quorum of all shards.
*
* If this replica has not witnessed the outcome of the transaction, it
should poll a majority of each shard
* for its outcome.
diff --git a/accord-core/src/main/java/accord/api/Read.java
b/accord-core/src/main/java/accord/api/Read.java
index b9eaf2a1..7def168c 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -32,4 +32,5 @@ public interface Read
AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore
commandStore, Timestamp executeAt, DataStore store);
Read slice(Ranges ranges);
Read merge(Read other);
+ default boolean isEqualOrFuller(Read other) { return true; }
}
diff --git a/accord-core/src/main/java/accord/api/Update.java
b/accord-core/src/main/java/accord/api/Update.java
index 8a56efcc..fa8493e5 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -36,4 +36,5 @@ public interface Update
Write apply(Timestamp executeAt, @Nullable Data data);
Update slice(Ranges ranges);
Update merge(Update other);
+ default boolean isEqualOrFuller(Update other) { return true; }
}
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 774df8ab..0188a0a7 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -15,37 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package accord.coordinate;
import java.util.function.BiConsumer;
-import accord.api.RoutingKey;
-import accord.local.Command;
-import accord.local.Commands;
import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommand;
-import accord.local.SafeCommandStore;
import accord.local.Status;
import accord.local.Status.Known;
-import accord.local.Status.Phase;
import accord.messages.CheckStatus;
import accord.messages.CheckStatus.CheckStatusOkFull;
-import accord.messages.CheckStatus.WithQuorum;
+import accord.messages.Propagate;
import accord.primitives.*;
import accord.utils.Invariants;
-import accord.utils.MapReduceConsume;
import javax.annotation.Nullable;
import static
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
-import static accord.local.PreLoadContext.contextFor;
-import static accord.local.Status.NotDefined;
-import static accord.local.Status.Phase.Cleanup;
-import static accord.local.Status.PreApplied;
import static accord.messages.CheckStatus.WithQuorum.NoQuorum;
-import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Route.castToRoute;
import static accord.primitives.Route.isRoute;
@@ -230,244 +216,7 @@ public class FetchData extends CheckShards<Route<?>>
if (success == ReadCoordinator.Success.Success)
Invariants.checkState(isSufficient(merged), "Status %s is not
sufficient", merged);
- OnDone.propagate(node, txnId, sourceEpoch, success.withQuorum,
route(), target, (CheckStatusOkFull) merged, callback);
- }
- }
-
- static class OnDone implements MapReduceConsume<SafeCommandStore, Void>,
EpochSupplier
- {
- final Node node;
- final TxnId txnId;
- final Route<?> route;
- final RoutingKey progressKey;
- final CheckStatusOkFull full;
- // this is a WHOLE NODE measure, so if commit epoch has more ranges we
do not count as committed if we can only commit in coordination epoch
- final Known achieved;
- final PartialTxn partialTxn;
- final PartialDeps partialDeps;
- final long toEpoch;
- final BiConsumer<Known, Throwable> callback;
-
- OnDone(Node node, TxnId txnId, Route<?> route, RoutingKey progressKey,
CheckStatusOkFull full, Known achieved, PartialTxn partialTxn, PartialDeps
partialDeps, long toEpoch, BiConsumer<Known, Throwable> callback)
- {
- this.node = node;
- this.txnId = txnId;
- this.route = route;
- this.progressKey = progressKey;
- this.full = full;
- this.achieved = achieved;
- this.partialTxn = partialTxn;
- this.partialDeps = partialDeps;
- this.toEpoch = toEpoch;
- this.callback = callback;
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- public static void propagate(Node node, TxnId txnId, long sourceEpoch,
WithQuorum withQuorum, Route route, @Nullable Known target, CheckStatusOkFull
full, BiConsumer<Known, Throwable> callback)
- {
- if (full.saveStatus.status == NotDefined &&
full.invalidIfNotAtLeast == NotDefined)
- {
- callback.accept(Known.Nothing, null);
- return;
- }
-
- Invariants.checkState(sourceEpoch == txnId.epoch() ||
(full.executeAt != null && sourceEpoch == full.executeAt.epoch()));
- Route<?> maxRoute = Route.merge(route, full.route);
-
- // TODO (required): permit individual shards that are behind to
catch up by themselves
- long toEpoch = sourceEpoch;
- Ranges sliceRanges =
node.topology().localRangesForEpochs(txnId.epoch(), toEpoch);
- if (!maxRoute.covers(sliceRanges))
- {
- callback.accept(Known.Nothing, null);
- return;
- }
-
- RoutingKey progressKey = node.trySelectProgressKey(txnId,
maxRoute);
-
- Ranges covering = maxRoute.sliceCovering(sliceRanges, Minimal);
- Participants<?> participatingKeys =
maxRoute.participants().slice(covering, Minimal);
- Known achieved = full.sufficientFor(participatingKeys, withQuorum);
- if (achieved.executeAt.hasDecidedExecuteAt() &&
full.executeAt.epoch() > toEpoch)
- {
- Ranges acceptRanges;
- if (!node.topology().hasEpoch(full.executeAt.epoch()) ||
- (!maxRoute.covers(acceptRanges =
node.topology().localRangesForEpochs(txnId.epoch(), full.executeAt.epoch()))))
- {
- // we don't know what the execution epoch requires, so we
cannot be sure we can replicate it locally
- // we *could* wait until we have the local epoch before
running this
- Status.Outcome outcome =
achieved.outcome.propagatesBetweenShards() ? achieved.outcome :
Status.Outcome.Unknown;
- achieved = new Known(achieved.definition,
achieved.executeAt, Status.KnownDeps.DepsUnknown, outcome);
- }
- else
- {
- // TODO (expected): this should only be the two precise
epochs, not the full range of epochs
- sliceRanges = acceptRanges;
- covering = maxRoute.sliceCovering(sliceRanges, Minimal);
- participatingKeys =
maxRoute.participants().slice(covering, Minimal);
- Known knownForExecution =
full.sufficientFor(participatingKeys, withQuorum);
- if ((target != null &&
target.isSatisfiedBy(knownForExecution)) ||
knownForExecution.isSatisfiedBy(achieved))
- {
- achieved = knownForExecution;
- toEpoch = full.executeAt.epoch();
- }
- else
- {
- Invariants.checkState(sourceEpoch == txnId.epoch(),
"%d != %d", sourceEpoch, txnId.epoch());
- achieved = new Known(achieved.definition,
achieved.executeAt, knownForExecution.deps, knownForExecution.outcome);
- }
- }
- }
-
- PartialTxn partialTxn = null;
- if (achieved.definition.isKnown())
- partialTxn = full.partialTxn.slice(sliceRanges,
true).reconstitutePartial(covering);
-
- PartialDeps partialDeps = null;
- if (achieved.deps.hasDecidedDeps())
- partialDeps =
full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
-
- new OnDone(node, txnId, maxRoute, progressKey, full, achieved,
partialTxn, partialDeps, toEpoch, callback).start();
- }
-
- void start()
- {
- Seekables<?, ?> keys = Keys.EMPTY;
- if (achieved.definition.isKnown())
- keys = partialTxn.keys();
- else if (achieved.deps.hasProposedOrDecidedDeps())
- keys = partialDeps.keyDeps.keys();
-
- PreLoadContext loadContext = contextFor(txnId, keys);
- node.mapReduceConsumeLocal(loadContext, route, txnId.epoch(),
toEpoch, this);
- }
-
- @Override
- public Void apply(SafeCommandStore safeStore)
- {
- SafeCommand safeCommand = safeStore.get(txnId, this, route);
- Command command = safeCommand.current();
- if (command.saveStatus().phase.compareTo(Phase.Persist) >= 0)
- return null;
-
- Status propagate = achieved.propagate();
- if (command.hasBeen(propagate))
- {
- if (full.maxSaveStatus.phase == Cleanup &&
full.durability.isDurableOrInvalidated() && Infer.safeToCleanup(safeStore,
command, route, full.executeAt))
- Commands.setTruncatedApply(safeStore, safeCommand);
- return null;
- }
-
- switch (propagate)
- {
- default: throw new IllegalStateException("Unexpected status: "
+ propagate);
- case Accepted:
- case AcceptedInvalidate:
- // we never "propagate" accepted statuses as these are
essentially votes,
- // and contribute nothing to our local state machine
- throw new IllegalStateException("Invalid states to
propagate: " + achieved.propagate());
-
- case Truncated:
- // if our peers have truncated this command, then either:
- // 1) we have already applied it locally; 2) the command
doesn't apply locally; 3) we are stale; or 4) the command is invalidated
- if (command.hasBeen(PreApplied) ||
command.saveStatus().isUninitialised())
- break;
-
- if (Infer.safeToCleanup(safeStore, command, route,
full.executeAt))
- {
- Commands.setErased(safeStore, safeCommand);
- break;
- }
-
- // TODO (required): check if we are stale
- // otherwise we are either stale, or the command didn't
reach consensus
-
- case Invalidated:
- Commands.commitInvalidate(safeStore, safeCommand, route);
- break;
-
- case Applied:
- case PreApplied:
- Invariants.checkState(full.executeAt != null);
- if (toEpoch >= full.executeAt.epoch())
- {
- confirm(Commands.apply(safeStore, safeCommand, txnId,
route, progressKey, full.executeAt, partialDeps, partialTxn, full.writes,
full.result));
- break;
- }
-
- case Committed:
- case ReadyToExecute:
- confirm(Commands.commit(safeStore, safeCommand, txnId,
route, progressKey, partialTxn, full.executeAt, partialDeps));
- break;
-
- case PreCommitted:
- Commands.precommit(safeStore, safeCommand, txnId,
full.executeAt, route);
- if (!achieved.definition.isKnown())
- break;
-
- case PreAccepted:
- // only preaccept if we coordinate the transaction
- if
(safeStore.ranges().coordinates(txnId).intersects(route) &&
Route.isFullRoute(route))
- Commands.preaccept(safeStore, safeCommand, txnId,
txnId.epoch(), partialTxn, Route.castToFullRoute(route), progressKey);
- break;
-
- case NotDefined:
- break;
- }
-
- RoutingKey homeKey = full.homeKey;
- if (!full.durability.isDurable() || homeKey == null)
- return null;
-
- if (!safeStore.ranges().coordinates(txnId).contains(homeKey))
- return null;
-
- Timestamp executeAt =
full.saveStatus.known.executeAt.hasDecidedExecuteAt() ? full.executeAt : null;
- Commands.setDurability(safeStore, safeCommand, full.durability,
route, executeAt);
- return null;
- }
-
- @Override
- public Void reduce(Void o1, Void o2)
- {
- return null;
- }
-
- @Override
- public void accept(Void result, Throwable failure)
- {
- callback.accept(failure == null ? achieved : null, failure);
- }
-
- @Override
- public long epoch()
- {
- return toEpoch;
+ Propagate.propagate(node, txnId, sourceEpoch, success.withQuorum,
route(), target, (CheckStatusOkFull) merged, callback);
}
}
-
- private static void confirm(Commands.CommitOutcome outcome)
- {
- switch (outcome)
- {
- default: throw new IllegalStateException("Unknown outcome: " +
outcome);
- case Redundant:
- case Success:
- return;
- case Insufficient: throw new IllegalStateException("Should have
enough information");
- }
- }
-
- private static void confirm(Commands.ApplyOutcome outcome)
- {
- switch (outcome)
- {
- default: throw new IllegalStateException("Unknown outcome: " +
outcome);
- case Redundant:
- case Success:
- return;
- case Insufficient: throw new IllegalStateException("Should have
enough information");
- }
- }
-
}
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 6f4559e5..fdcff134 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -20,7 +20,6 @@ package accord.coordinate;
import java.util.function.BiConsumer;
-import accord.coordinate.FetchData.OnDone;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.Status;
@@ -29,6 +28,7 @@ import accord.messages.CheckStatus;
import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.CheckStatusOkFull;
import accord.messages.CheckStatus.IncludeInfo;
+import accord.messages.Propagate;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
@@ -177,7 +177,7 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
}
}
- OnDone.propagate(node, txnId, sourceEpoch,
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ?
full.toProgressToken() : null, f));
+ Propagate.propagate(node, txnId, sourceEpoch,
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ?
full.toProgressToken() : null, f));
break;
}
@@ -201,11 +201,11 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
if (witnessedByInvalidation != null &&
witnessedByInvalidation.hasBeen(Status.PreCommitted))
throw new IllegalStateException("We previously
invalidated, finding a status that should be recoverable");
- OnDone.propagate(node, txnId, sourceEpoch, success.withQuorum,
route, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null,
f));
+ Propagate.propagate(node, txnId, sourceEpoch,
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ?
INVALIDATED : null, f));
break;
case Erased:
- OnDone.propagate(node, txnId, sourceEpoch, success.withQuorum,
route, null, full, (s, f) -> callback.accept(f == null ? TRUNCATED : null, f));
+ Propagate.propagate(node, txnId, sourceEpoch,
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ?
TRUNCATED : null, f));
break;
}
}
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 194fe2da..18d0d605 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -212,6 +212,12 @@ public abstract class Command implements CommonAttributes
}
}
+ /**
+ * @return true if this command is equivalent to {@code other}; it could
also be fuller - have some of its
+ * registers be not sliced, whereas {@code other} may have them sliced
+ */
+ public abstract boolean isEqualOrFuller(Command other);
+
private abstract static class AbstractCommand extends Command
{
private final TxnId txnId;
@@ -247,12 +253,25 @@ public abstract class Command implements CommonAttributes
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Command command = (Command) o;
- return txnId.equals(command.txnId())
- && status == command.saveStatus()
- && durability == command.durability()
- && Objects.equals(route, command.route())
- && Objects.equals(promised, command.promised())
- && listeners.equals(command.durableListeners());
+ return txnId().equals(command.txnId())
+ && saveStatus() == command.saveStatus()
+ && durability() == command.durability()
+ && Objects.equals(route(), command.route())
+ && Objects.equals(promised(), command.promised())
+ && Objects.equals(durableListeners(),
command.durableListeners());
+ }
+
+ @Override
+ public boolean isEqualOrFuller(Command command)
+ {
+ if (this == command) return true;
+ if (command == null || getClass() != command.getClass()) return
false;
+ return txnId().equals(command.txnId())
+ && saveStatus() == command.saveStatus()
+ && durability() == command.durability()
+ && Objects.equals(route(), command.route())
+ && Objects.equals(promised(), command.promised())
+ && durableListeners().containsAll(command.durableListeners());
}
@Override
@@ -288,9 +307,7 @@ public abstract class Command implements CommonAttributes
@Override
public Listeners.Immutable durableListeners()
{
- if (listeners == null)
- return EMPTY;
- return listeners;
+ return listeners == null ? EMPTY : listeners;
}
@Override
@@ -600,6 +617,7 @@ public abstract class Command implements CommonAttributes
@Nullable final Timestamp executeAt;
@Nullable final Writes writes;
@Nullable final Result result;
+
public Truncated(CommonAttributes commonAttributes, SaveStatus
saveStatus, @Nullable Timestamp executeAt, @Nullable Writes writes, @Nullable
Result result)
{
super(commonAttributes, saveStatus, Ballot.MAX);
@@ -616,9 +634,38 @@ public abstract class Command implements CommonAttributes
this.result = result;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ Truncated that = (Truncated) o;
+ return Objects.equals(executeAt, that.executeAt)
+ && Objects.equals(writes, that.writes)
+ && Objects.equals(result, that.result);
+ }
+
+ @Override
+ public boolean isEqualOrFuller(Command c)
+ {
+ if (this == c) return true;
+ if (c == null || getClass() != c.getClass()) return false;
+ if (!super.isEqualOrFuller(c)) return false;
+ Truncated that = (Truncated) c;
+ return Objects.equals(executeAt(), that.executeAt())
+ && Objects.equals(writes(), that.writes())
+ && Objects.equals(result(), that.result());
+ }
+
public static Truncated erased(Command command)
{
- return new Truncated(command.txnId(), SaveStatus.Erased,
command.durability(), command.route(), null, EMPTY, null, null);
+ return erased(command.txnId(), command.durability(),
command.route());
+ }
+
+ public static Truncated erased(TxnId txnId, Status.Durability
durability, Route<?> route)
+ {
+ return new Truncated(txnId, SaveStatus.Erased, durability, route,
null, EMPTY, null, null);
}
public static Truncated truncatedApply(Command command)
@@ -633,7 +680,6 @@ public abstract class Command implements CommonAttributes
}
public static Truncated truncatedApply(CommonAttributes common,
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result)
-
{
Invariants.checkArgument(executeAt != null);
Invariants.checkArgument(saveStatus == SaveStatus.TruncatedApply
|| saveStatus == SaveStatus.TruncatedApplyWithDeps || saveStatus ==
SaveStatus.TruncatedApplyWithOutcome);
@@ -729,10 +775,24 @@ public abstract class Command implements CommonAttributes
&& Objects.equals(partialDeps, that.partialDeps);
}
+ @Override
+ public boolean isEqualOrFuller(Command c)
+ {
+ if (this == c) return true;
+ if (c == null || getClass() != c.getClass()) return false;
+ if (!super.isEqualOrFuller(c)) return false;
+ PreAccepted that = (PreAccepted) c;
+ if (!executeAt().equals(that.executeAt()) ||
!partialTxn().isEqualOrFuller(that.partialTxn()))
+ return false;
+ return (partialDeps() == null && that.partialDeps() == null)
+ || (partialDeps() != null && that.partialDeps() != null &&
partialDeps().isEqualOrFuller(that.partialDeps()));
+ }
+
public static PreAccepted preAccepted(CommonAttributes common,
Timestamp executeAt, Ballot promised)
{
return new PreAccepted(common, SaveStatus.PreAccepted, executeAt,
promised);
}
+
public static PreAccepted preAccepted(PreAccepted command,
CommonAttributes common, Ballot promised)
{
checkPromised(command, promised);
@@ -792,16 +852,28 @@ public abstract class Command implements CommonAttributes
return Objects.equals(accepted, that.accepted);
}
+ @Override
+ public boolean isEqualOrFuller(Command c)
+ {
+ if (this == c) return true;
+ if (c == null || getClass() != c.getClass()) return false;
+ if (!super.isEqualOrFuller(c)) return false;
+ Accepted that = (Accepted) c;
+ return Objects.equals(accepted(), that.accepted());
+ }
+
static Accepted accepted(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted)
{
return new Accepted(common, status, executeAt, promised, accepted);
}
+
static Accepted accepted(Accepted command, CommonAttributes common,
SaveStatus status, Ballot promised)
{
checkPromised(command, promised);
checkSameClass(command, Accepted.class, "Cannot update");
return new Accepted(common, status, command.executeAt(), promised,
command.accepted());
}
+
static Accepted accepted(Accepted command, CommonAttributes common,
Ballot promised)
{
return accepted(command, common, command.saveStatus(), promised);
@@ -842,6 +914,17 @@ public abstract class Command implements CommonAttributes
return Objects.equals(waitingOn, committed.waitingOn);
}
+ @Override
+ public boolean isEqualOrFuller(Command c)
+ {
+ if (this == c) return true;
+ if (c == null || getClass() != c.getClass()) return false;
+ if (!super.isEqualOrFuller(c)) return false;
+ Committed committed = (Committed) c;
+ return (waitingOn() == null && committed.waitingOn() == null)
+ || (waitingOn() != null && committed.waitingOn() != null &&
waitingOn().isEqualOrFuller(committed.waitingOn()));
+ }
+
private static Committed committed(Committed command, CommonAttributes
common, Ballot promised, SaveStatus status, WaitingOn waitingOn)
{
checkPromised(command, promised);
@@ -922,7 +1005,18 @@ public abstract class Command implements CommonAttributes
if (!super.equals(o)) return false;
Executed executed = (Executed) o;
return Objects.equals(writes, executed.writes)
- && Objects.equals(result, executed.result);
+ && Objects.equals(result, executed.result);
+ }
+
+ @Override
+ public boolean isEqualOrFuller(Command c)
+ {
+ if (this == c) return true;
+ if (c == null || getClass() != c.getClass()) return false;
+ if (!super.isEqualOrFuller(c)) return false;
+ Executed executed = (Executed) c;
+ return Objects.equals(writes(), executed.writes())
+ && Objects.equals(result(), executed.result());
}
public static Executed executed(Executed command, CommonAttributes
common, SaveStatus status, Ballot promised, WaitingOn waitingOn)
@@ -1107,6 +1201,12 @@ public abstract class Command implements CommonAttributes
&&
this.appliedOrInvalidated.equals(other.appliedOrInvalidated);
}
+ public boolean isEqualOrFuller(WaitingOn other)
+ {
+ return
computeWaitingOnCommit().containsAll(other.computeWaitingOnCommit())
+ &&
computeWaitingOnApply().containsAll(other.computeWaitingOnApply());
+ }
+
public static class Update
{
final Deps deps;
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 276f631e..8e4237f1 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -54,6 +54,7 @@ import accord.coordinate.MaybeRecover;
import accord.coordinate.Outcome;
import accord.coordinate.RecoverWithRoute;
import accord.messages.Callback;
+import accord.messages.LocalMessage;
import accord.messages.Reply;
import accord.messages.ReplyContext;
import accord.messages.Request;
@@ -134,6 +135,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
private final Id id;
private final MessageSink messageSink;
+ private final LocalMessage.Handler localMessageHandler;
private final ConfigurationService configService;
private final TopologyManager topology;
private final CommandStores commandStores;
@@ -150,12 +152,14 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
// TODO (expected, liveness): monitor the contents of this collection for
stalled coordination, and excise them
private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating =
new ConcurrentHashMap<>();
- public Node(Id id, MessageSink messageSink, ConfigurationService
configService, LongSupplier nowSupplier, ToLongFunction<TimeUnit> nowTimeUnit,
+ public Node(Id id, MessageSink messageSink, LocalMessage.Handler
localMessageHandler,
+ ConfigurationService configService, LongSupplier nowSupplier,
ToLongFunction<TimeUnit> nowTimeUnit,
Supplier<DataStore> dataSupplier, ShardDistributor
shardDistributor, Agent agent, RandomSource random, Scheduler scheduler,
TopologySorter.Supplier topologySorter,
Function<Node, ProgressLog.Factory> progressLogFactory,
CommandStores.Factory factory)
{
this.id = id;
this.messageSink = messageSink;
+ this.localMessageHandler = localMessageHandler;
this.configService = configService;
this.topology = new TopologyManager(topologySorter, id);
this.nowSupplier = nowSupplier;
@@ -483,6 +487,11 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
messageSink.send(to, send);
}
+ public void localMessage(LocalMessage message)
+ {
+ localMessageHandler.handle(message, this);
+ }
+
public void reply(Id replyingToNode, ReplyContext replyContext, Reply
send, Throwable failure)
{
if (failure != null)
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java
b/accord-core/src/main/java/accord/local/SerializerSupport.java
new file mode 100644
index 00000000..a42888bd
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.local;
+
+import accord.api.Result;
+import accord.api.VisibleForImplementation;
+import accord.local.Command.WaitingOn;
+import accord.local.CommonAttributes.Mutable;
+import accord.messages.*;
+import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Timestamp;
+import accord.primitives.Writes;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
+import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
+import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
+import static accord.messages.MessageType.COMMIT_MINIMAL_REQ;
+import static accord.messages.MessageType.PRE_ACCEPT_REQ;
+import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
+import static accord.messages.MessageType.PROPAGATE_COMMIT_MSG;
+import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.primitives.PartialTxn.merge;
+import static accord.utils.Invariants.checkState;
+
+import static java.util.EnumSet.of;
+
+@VisibleForImplementation
+public class SerializerSupport
+{
+ /**
+ * Reconstructs Command from register values and protocol messages.
+ */
+ public static Command reconstruct(Mutable attrs, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider
waitingOnProvider, MessageProvider messageProvider)
+ {
+ switch (status.status)
+ {
+ case NotDefined:
+ return Command.NotDefined.notDefined(attrs, promised);
+ case PreAccepted:
+ return preAccepted(attrs, executeAt, promised,
messageProvider);
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ return accepted(attrs, status, executeAt, promised, accepted,
messageProvider);
+ case Committed:
+ case ReadyToExecute:
+ return committed(attrs, status, executeAt, promised, accepted,
waitingOnProvider, messageProvider);
+ case PreApplied:
+ case Applied:
+ return executed(attrs, status, executeAt, promised, accepted,
waitingOnProvider, messageProvider);
+ case Truncated:
+ case Invalidated:
+ return truncated(attrs, status, executeAt, messageProvider);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private static final EnumSet<MessageType> PRE_ACCEPT_TYPES =
+ of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG);
+
+ private static Command.PreAccepted preAccepted(Mutable attrs, Timestamp
executeAt, Ballot promised, MessageProvider messageProvider)
+ {
+ Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
+ checkState(!witnessed.isEmpty());
+ attrs.partialTxn(txnFromPreAcceptOrBeginRecover(witnessed,
messageProvider));
+ return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
+ }
+
+ private static Command.Accepted accepted(Mutable attrs, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, MessageProvider
messageProvider)
+ {
+ if (status.known.isDefinitionKnown())
+ {
+ Set<MessageType> witnessed =
messageProvider.test(PRE_ACCEPT_TYPES);
+ checkState(!witnessed.isEmpty());
+ attrs.partialTxn(txnFromPreAcceptOrBeginRecover(witnessed,
messageProvider));
+ }
+
+ if (status.known.deps.hasProposedDeps())
+ {
+ Accept accept = messageProvider.accept(accepted);
+ attrs.partialDeps(accept.partialDeps);
+ }
+
+ return Command.Accepted.accepted(attrs, status, executeAt, promised,
accepted);
+ }
+
+ private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_TYPES =
+ of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG,
+ COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG);
+
+ private static Command.Committed committed(Mutable attrs, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted,
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
+ {
+ Set<MessageType> witnessed =
messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
+
+ PartialTxn txn;
+ PartialDeps deps;
+
+ if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+ {
+ Commit commit = messageProvider.commitMaximal();
+ txn = commit.partialTxn;
+ deps = commit.partialDeps;
+ }
+ else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+ {
+ Propagate propagate = messageProvider.propagateCommit();
+ txn = propagate.partialTxn;
+ deps = propagate.partialDeps;
+ }
+ else
+ {
+ checkState(witnessed.contains(COMMIT_MINIMAL_REQ));
+ Commit commit = messageProvider.commitMinimal();
+ txn = merge(txnFromPreAcceptOrBeginRecover(witnessed,
messageProvider), commit.partialTxn);
+ deps = commit.partialDeps;
+ }
+
+ attrs.partialTxn(txn)
+ .partialDeps(deps);
+
+ return Command.Committed.committed(attrs, status, executeAt, promised,
accepted, waitingOnProvider.provide(deps));
+ }
+
+ private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
+ of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG,
+ COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG,
+ APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG);
+
+ private static Command.Executed executed(Mutable attrs, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider
waitingOnProvider, MessageProvider messageProvider)
+ {
+ Set<MessageType> witnessed =
messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES);
+
+ PartialTxn txn;
+ PartialDeps deps;
+ Writes writes;
+ Result result;
+
+ if (witnessed.contains(APPLY_MAXIMAL_REQ))
+ {
+ Apply apply = messageProvider.applyMaximal();
+ txn = apply.txn;
+ deps = apply.deps;
+ writes = apply.writes;
+ result = apply.result;
+ }
+ else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+ {
+ Propagate propagate = messageProvider.propagateApply();
+ txn = propagate.partialTxn;
+ deps = propagate.partialDeps;
+ writes = propagate.writes;
+ result = propagate.result;
+ }
+ else
+ {
+ checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+
+ Apply apply = messageProvider.applyMinimal();
+ writes = apply.writes;
+ result = apply.result;
+
+ /*
+ * NOTE: If Commit has been witnessed, we'll extract deps from
there;
+ * Apply has an expected TO-DO to stop including deps in such case.
+ */
+ if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+ {
+ Commit commit = messageProvider.commitMaximal();
+ txn = commit.partialTxn;
+ deps = commit.partialDeps;
+ }
+ else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+ {
+ Propagate propagateCommit = messageProvider.propagateCommit();
+ txn = propagateCommit.partialTxn;
+ deps = propagateCommit.partialDeps;
+ }
+ else if (witnessed.contains(COMMIT_MINIMAL_REQ))
+ {
+ Commit commit = messageProvider.commitMinimal();
+ txn = merge(apply.txn, merge(commit.partialTxn,
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider)));
+ deps = commit.partialDeps;
+ }
+ else
+ {
+ txn = merge(apply.txn,
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider));
+ deps = apply.deps;
+ }
+ }
+
+ attrs.partialTxn(txn)
+ .partialDeps(deps);
+
+ return Command.Executed.executed(attrs, status, executeAt, promised,
accepted, waitingOnProvider.provide(deps), writes, result);
+ }
+
+ private static final EnumSet<MessageType> APPLY_TYPES =
+ of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG);
+
+ private static Command.Truncated truncated(Mutable attrs, SaveStatus
status, Timestamp executeAt, MessageProvider messageProvider)
+ {
+ Writes writes = null;
+ Result result = null;
+
+ switch (status)
+ {
+ default:
+ throw new IllegalStateException("Unhandled SaveStatus: " +
status);
+ case TruncatedApplyWithOutcome:
+ case TruncatedApplyWithDeps:
+ Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES);
+ checkState(!witnessed.isEmpty());
+ if (witnessed.contains(APPLY_MINIMAL_REQ))
+ {
+ Apply apply = messageProvider.applyMinimal();
+ writes = apply.writes;
+ result = apply.result;
+ }
+ if (witnessed.contains(APPLY_MAXIMAL_REQ))
+ {
+ Apply apply = messageProvider.applyMaximal();
+ writes = apply.writes;
+ result = apply.result;
+ }
+ else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+ {
+ Propagate propagate = messageProvider.propagateApply();
+ writes = propagate.writes;
+ result = propagate.result;
+ }
+ case TruncatedApply:
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result);
+ case Erased:
+ return Command.Truncated.erased(attrs.txnId(),
attrs.durability(), attrs.route());
+ case Invalidated:
+ return Command.Truncated.invalidated(attrs.txnId(),
attrs.durableListeners());
+ }
+ }
+
+ public static class TxnAndDeps
+ {
+ public static TxnAndDeps EMPTY = new TxnAndDeps(null, null);
+
+ public final PartialTxn txn;
+ public final PartialDeps deps;
+
+ TxnAndDeps(PartialTxn txn, PartialDeps deps)
+ {
+ this.txn = txn;
+ this.deps = deps;
+ }
+ }
+
+ public static TxnAndDeps extractTxnAndDeps(SaveStatus status, Ballot
accepted, MessageProvider messageProvider)
+ {
+ Set<MessageType> witnessed;
+
+ switch (status.status)
+ {
+ case PreAccepted:
+ witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
+ checkState(!witnessed.isEmpty());
+ return new
TxnAndDeps(txnFromPreAcceptOrBeginRecover(witnessed, messageProvider), null);
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ PartialTxn txn = null;
+ PartialDeps deps = null;
+
+ if (status.known.isDefinitionKnown())
+ {
+ witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
+ checkState(!witnessed.isEmpty());
+ txn = txnFromPreAcceptOrBeginRecover(witnessed,
messageProvider);
+ }
+
+ if (status.known.deps.hasProposedDeps())
+ {
+ Accept accept = messageProvider.accept(accepted);
+ deps = accept.partialDeps;
+ }
+
+ return new TxnAndDeps(txn, deps);
+ case Committed:
+ case ReadyToExecute:
+ witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
+ if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+ {
+ Commit commit = messageProvider.commitMaximal();
+ return new TxnAndDeps(commit.partialTxn,
commit.partialDeps);
+ }
+ else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+ {
+ Propagate propagate = messageProvider.propagateCommit();
+ return new TxnAndDeps(propagate.partialTxn,
propagate.partialDeps);
+ }
+ else
+ {
+ checkState(witnessed.contains(COMMIT_MINIMAL_REQ));
+ Commit commit = messageProvider.commitMinimal();
+ return new
TxnAndDeps(merge(txnFromPreAcceptOrBeginRecover(witnessed, messageProvider),
commit.partialTxn), commit.partialDeps);
+ }
+ case PreApplied:
+ case Applied:
+ witnessed =
messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES);
+ if (witnessed.contains(APPLY_MAXIMAL_REQ))
+ {
+ Apply apply = messageProvider.applyMaximal();
+ return new TxnAndDeps(apply.txn, apply.deps);
+ }
+ else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+ {
+ Propagate propagate = messageProvider.propagateApply();
+ return new TxnAndDeps(propagate.partialTxn,
propagate.partialDeps);
+ }
+ else if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+ {
+ Commit commit = messageProvider.commitMaximal();
+ return new TxnAndDeps(commit.partialTxn,
commit.partialDeps);
+ }
+ else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+ {
+ Propagate propagate = messageProvider.propagateCommit();
+ return new TxnAndDeps(propagate.partialTxn,
propagate.partialDeps);
+ }
+ else if (witnessed.contains(COMMIT_MINIMAL_REQ))
+ {
+ checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+ Apply apply = messageProvider.applyMinimal();
+ Commit commit = messageProvider.commitMinimal();
+ return new TxnAndDeps(merge(apply.txn,
merge(commit.partialTxn, txnFromPreAcceptOrBeginRecover(witnessed,
messageProvider))), commit.partialDeps);
+ }
+ else
+ {
+ checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+ Apply apply = messageProvider.applyMinimal();
+ return new TxnAndDeps(merge(apply.txn,
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider)), apply.deps);
+ }
+ case NotDefined:
+ case Truncated:
+ case Invalidated:
+ return TxnAndDeps.EMPTY;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private static PartialTxn txnFromPreAcceptOrBeginRecover(Set<MessageType>
witnessed, MessageProvider messageProvider)
+ {
+ if (witnessed.contains(PRE_ACCEPT_REQ))
+ return messageProvider.preAccept().partialTxn;
+
+ if (witnessed.contains(BEGIN_RECOVER_REQ))
+ return messageProvider.beginRecover().partialTxn;
+
+ if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+ return messageProvider.propagatePreAccept().partialTxn;
+
+ return null;
+ }
+
+ public interface WaitingOnProvider
+ {
+ WaitingOn provide(PartialDeps deps);
+ }
+
+ public interface MessageProvider
+ {
+ Set<MessageType> test(Set<MessageType> messages);
+
+ PreAccept preAccept();
+
+ BeginRecovery beginRecover();
+
+ Propagate propagatePreAccept();
+
+ Accept accept(Ballot ballot);
+
+ Commit commitMinimal();
+
+ Commit commitMaximal();
+
+ Propagate propagateCommit();
+
+ Apply applyMinimal();
+
+ Apply applyMaximal();
+
+ Propagate propagateApply();
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/Status.java
b/accord-core/src/main/java/accord/local/Status.java
index a326b2af..bf0f3bef 100644
--- a/accord-core/src/main/java/accord/local/Status.java
+++ b/accord-core/src/main/java/accord/local/Status.java
@@ -55,8 +55,8 @@ public enum Status
* So, for execution of other transactions we may treat a PreCommitted
transaction as Committed,
* using the timestamp to update our dependency set to rule it out as a
dependency.
* But we do not have enough information to execute the transaction, and
when recovery calculates
- * {@link BeginRecovery#acceptedStartedBeforeWithoutWitnessing}, {@link
BeginRecovery#hasCommittedExecutesAfterWithoutWitnessing}
- *
+ * {@link BeginRecovery#acceptedStartedBeforeWithoutWitnessing},
+ * {@link BeginRecovery#hasCommittedExecutesAfterWithoutWitnessing},
* and {@link BeginRecovery#committedStartedBeforeAndWitnessed} we may not
have the dependencies
* to calculate the result. For these operations we treat ourselves as
whatever Accepted status
* we may have previously taken, using any proposed dependencies to
compute the result.
@@ -312,6 +312,11 @@ public enum Status
NoDeps
;
+ public boolean hasProposedDeps()
+ {
+ return this == DepsProposed;
+ }
+
public boolean hasDecidedDeps()
{
return this == DepsKnown;
diff --git a/accord-core/src/main/java/accord/messages/Apply.java
b/accord-core/src/main/java/accord/messages/Apply.java
index 12d5614c..2e1d2011 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -42,19 +42,17 @@ import accord.primitives.Unseekables;
import accord.primitives.Writes;
import accord.topology.Topologies;
-import static accord.messages.MessageType.APPLY_REQ;
-import static accord.messages.MessageType.APPLY_RSP;
-
public class Apply extends TxnRequest<ApplyReply>
{
public static class SerializationSupport
{
- public static Apply create(TxnId txnId, PartialRoute<?> scope, long
waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps,
PartialTxn txn, Writes writes, Result result)
+ public static Apply create(TxnId txnId, PartialRoute<?> scope, long
waitForEpoch, Kind kind, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps
deps, PartialTxn txn, Writes writes, Result result)
{
- return new Apply(txnId, scope, waitForEpoch, keys, executeAt,
deps, txn, writes, result);
+ return new Apply(kind, txnId, scope, waitForEpoch, keys,
executeAt, deps, txn, writes, result);
}
}
+ public final Kind kind;
public final Timestamp executeAt;
public final Seekables<?, ?> keys;
public final PartialDeps deps; // TODO (expected): this should be
nullable, and only included if we did not send Commit (or if sending Maximal
apply)
@@ -62,15 +60,18 @@ public class Apply extends TxnRequest<ApplyReply>
public final Writes writes;
public final Result result;
- private Apply(Id to, Topologies participates, Topologies executes, TxnId
txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, boolean
isMaximal, Writes writes, Result result)
+ public enum Kind { Minimal, Maximal }
+
+ private Apply(Kind kind, Id to, Topologies participates, Topologies
executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result)
{
super(to, participates, route, txnId);
- Ranges slice = isMaximal || executes == participates ?
scope.covering() : executes.computeRangesForNode(to);
+ Ranges slice = kind == Kind.Maximal || executes == participates ?
scope.covering() : executes.computeRangesForNode(to);
// TODO (desired): it's wasteful to encode the full set of ranges
owned by the recipient node;
// often it will be cheaper to include the FullRoute for Deps
scope (or come up with some other safety-preserving encoding scheme)
+ this.kind = kind;
this.deps = deps.slice(slice);
this.keys = txn.keys().slice(slice);
- this.txn = isMaximal ? txn.slice(slice, true) : null;
+ this.txn = kind == Kind.Maximal ? txn.slice(slice, true) : null;
this.executeAt = executeAt;
this.writes = writes;
this.result = result;
@@ -102,17 +103,18 @@ public class Apply extends TxnRequest<ApplyReply>
public static Apply applyMinimal(Id to, Topologies sendTo, Topologies
applyTo, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result)
{
- return new Apply(to, sendTo, applyTo, txnId, route, txn, executeAt,
deps, false, writes, result);
+ return new Apply(Kind.Minimal, to, sendTo, applyTo, txnId, route, txn,
executeAt, deps, writes, result);
}
public static Apply applyMaximal(Id to, Topologies participates,
Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt,
Deps deps, Writes writes, Result result)
{
- return new Apply(to, participates, executes, txnId, route, txn,
executeAt, deps, true, writes, result);
+ return new Apply(Kind.Maximal, to, participates, executes, txnId,
route, txn, executeAt, deps, writes, result);
}
- private Apply(TxnId txnId, PartialRoute<?> route, long waitForEpoch,
Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, @Nullable
PartialTxn txn, Writes writes, Result result)
+ private Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long
waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps,
@Nullable PartialTxn txn, Writes writes, Result result)
{
super(txnId, route, waitForEpoch);
+ this.kind = kind;
this.executeAt = executeAt;
this.deps = deps;
this.keys = keys;
@@ -172,7 +174,12 @@ public class Apply extends TxnRequest<ApplyReply>
@Override
public MessageType type()
{
- return APPLY_REQ;
+ switch (kind)
+ {
+ case Minimal: return MessageType.APPLY_MINIMAL_REQ;
+ case Maximal: return MessageType.APPLY_MAXIMAL_REQ;
+ default: throw new IllegalStateException();
+ }
}
public enum ApplyReply implements Reply
@@ -182,7 +189,7 @@ public class Apply extends TxnRequest<ApplyReply>
@Override
public MessageType type()
{
- return APPLY_RSP;
+ return MessageType.APPLY_RSP;
}
@Override
@@ -201,8 +208,8 @@ public class Apply extends TxnRequest<ApplyReply>
@Override
public String toString()
{
- return "Apply{" +
- "txnId:" + txnId +
+ return "Apply{kind:" + kind +
+ ", txnId:" + txnId +
", deps:" + deps +
", executeAt:" + executeAt +
", writes:" + writes +
diff --git a/accord-core/src/main/java/accord/messages/Commit.java
b/accord-core/src/main/java/accord/messages/Commit.java
index b755827b..9d692e6a 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package accord.messages;
import java.util.Set;
+
import javax.annotation.Nullable;
import org.slf4j.Logger;
@@ -59,12 +59,13 @@ public class Commit extends TxnRequest<ReadNack>
public static class SerializerSupport
{
- public static Commit create(TxnId txnId, PartialRoute<?> scope, long
waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps
partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+ public static Commit create(TxnId txnId, PartialRoute<?> scope, long
waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn,
PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable
ReadTxnData read)
{
- return new Commit(txnId, scope, waitForEpoch, executeAt,
partialTxn, partialDeps, fullRoute, read);
+ return new Commit(kind, txnId, scope, waitForEpoch, executeAt,
partialTxn, partialDeps, fullRoute, read);
}
}
+ public final Kind kind;
public final Timestamp executeAt;
public final @Nullable PartialTxn partialTxn;
public final PartialDeps partialDeps;
@@ -99,6 +100,7 @@ public class Commit extends TxnRequest<ReadNack>
partialTxn = txn.slice(extraRanges,
coordinateRanges.contains(route.homeKey()));
}
+ this.kind = kind;
this.executeAt = executeAt;
this.partialTxn = partialTxn;
this.partialDeps = deps.slice(scope.covering());
@@ -106,9 +108,10 @@ public class Commit extends TxnRequest<ReadNack>
this.read = read ? new ReadTxnData(to, topologies, txnId, readScope,
executeAt) : null;
}
- Commit(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp
executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable
FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+ Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch,
Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps,
@Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
{
super(txnId, scope, waitForEpoch);
+ this.kind = kind;
this.executeAt = executeAt;
this.partialTxn = partialTxn;
this.partialDeps = partialDeps;
@@ -168,7 +171,7 @@ public class Commit extends TxnRequest<ReadNack>
Route<?> route = this.route != null ? this.route : scope;
SafeCommand safeCommand = safeStore.get(txnId, executeAt, route);
- switch (Commands.commit(safeStore, safeCommand, txnId, route != null ?
route : scope, progressKey, partialTxn, executeAt, partialDeps))
+ switch (Commands.commit(safeStore, safeCommand, txnId, route,
progressKey, partialTxn, executeAt, partialDeps))
{
default:
case Success:
@@ -207,13 +210,19 @@ public class Commit extends TxnRequest<ReadNack>
@Override
public MessageType type()
{
- return MessageType.COMMIT_REQ;
+ switch (kind)
+ {
+ case Minimal: return MessageType.COMMIT_MINIMAL_REQ;
+ case Maximal: return MessageType.COMMIT_MAXIMAL_REQ;
+ default: throw new IllegalStateException();
+ }
}
@Override
public String toString()
{
- return "Commit{txnId: " + txnId +
+ return "Commit{kind:" + kind +
+ ", txnId: " + txnId +
", executeAt: " + executeAt +
", deps: " + partialDeps +
", read: " + read +
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java
b/accord-core/src/main/java/accord/messages/InformDurable.java
index d9f42214..d79b50af 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -15,10 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package accord.messages;
-import accord.api.ProgressLog.ProgressShard;
import accord.local.Commands;
import accord.local.Node.Id;
import accord.local.PreLoadContext;
@@ -33,9 +31,6 @@ import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.Invariants;
-import static accord.api.ProgressLog.ProgressShard.Adhoc;
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
import static accord.local.PreLoadContext.contextFor;
import static accord.messages.SimpleReply.Ok;
@@ -51,7 +46,6 @@ public class InformDurable extends TxnRequest<Reply>
implements PreLoadContext
public final Timestamp executeAt;
public final Durability durability;
- private transient ProgressShard shard;
public InformDurable(Id to, Topologies topologies, FullRoute<?> route,
TxnId txnId, Timestamp executeAt, Durability durability)
{
@@ -79,11 +73,6 @@ public class InformDurable extends TxnRequest<Reply>
implements PreLoadContext
for (long epoch = waitForEpoch; progressKey == null && epoch >
txnId.epoch() ; --epoch)
progressKey = node.trySelectProgressKey(epoch, scope,
scope.homeKey());
Invariants.checkState(progressKey != null);
- shard = Adhoc;
- }
- else
- {
- shard = scope.homeKey().equals(progressKey) ? Home : Local;
}
// TODO (expected, efficiency): do not load from disk to perform this
update
diff --git a/accord-core/src/main/java/accord/api/Read.java
b/accord-core/src/main/java/accord/messages/LocalMessage.java
similarity index 63%
copy from accord-core/src/main/java/accord/api/Read.java
copy to accord-core/src/main/java/accord/messages/LocalMessage.java
index b9eaf2a1..07b11899 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/messages/LocalMessage.java
@@ -15,21 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package accord.messages;
-package accord.api;
+import accord.local.Node;
+import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.utils.async.AsyncChain;
-
-
-/**
- * A read to be performed on potentially multiple shards, the inputs of which
may be fed to a {@link Query}
- */
-public interface Read
+public interface LocalMessage extends Message, PreLoadContext
{
- Seekables<?, ?> keys();
- AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore
commandStore, Timestamp executeAt, DataStore store);
- Read slice(Ranges ranges);
- Read merge(Read other);
+ interface Handler
+ {
+ void handle(LocalMessage message, Node node);
+ }
+
+ void process(Node node);
}
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java
b/accord-core/src/main/java/accord/messages/MessageType.java
index 7909157a..aab5e0c1 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -17,53 +17,86 @@
*/
package accord.messages;
+import static accord.messages.MessageType.Kind.REMOTE;
+import static accord.messages.MessageType.Kind.LOCAL;
+
/**
- * Meant to assist implementations map accord messages to their own messaging
systems.
+ * Meant to assist implementations with mapping accord messages to their own
messaging systems.
*/
public enum MessageType
{
- SIMPLE_RSP (false),
- PRE_ACCEPT_REQ (true ),
- PRE_ACCEPT_RSP (false),
- ACCEPT_REQ (true ),
- ACCEPT_RSP (false),
- ACCEPT_INVALIDATE_REQ (true ),
- GET_DEPS_REQ (false),
- GET_DEPS_RSP (false),
- COMMIT_REQ (true ),
- COMMIT_INVALIDATE_REQ (true ),
- APPLY_REQ (true ),
- APPLY_RSP (false),
- READ_REQ (false),
- READ_RSP (false),
- BEGIN_RECOVER_REQ (true ),
- BEGIN_RECOVER_RSP (false),
- BEGIN_INVALIDATE_REQ (true ),
- BEGIN_INVALIDATE_RSP (false),
- WAIT_ON_COMMIT_REQ (false),
- WAIT_ON_COMMIT_RSP (false),
- WAIT_ON_APPLY_REQ (false),
- INFORM_OF_TXN_REQ (true ),
- INFORM_DURABLE_REQ (true ),
- INFORM_HOME_DURABLE_REQ (true ),
- CHECK_STATUS_REQ (false),
- CHECK_STATUS_RSP (false),
- FETCH_DATA_REQ (false),
- FETCH_DATA_RSP (false),
- SET_SHARD_DURABLE_REQ (true ),
- SET_GLOBALLY_DURABLE_REQ (true ),
- QUERY_DURABLE_BEFORE_REQ (false),
- QUERY_DURABLE_BEFORE_RSP (false),
- FAILURE_RSP (false),
+ SIMPLE_RSP (REMOTE, false),
+ FAILURE_RSP (REMOTE, false),
+ PRE_ACCEPT_REQ (REMOTE, true ),
+ PRE_ACCEPT_RSP (REMOTE, false),
+ ACCEPT_REQ (REMOTE, true ),
+ ACCEPT_RSP (REMOTE, false),
+ ACCEPT_INVALIDATE_REQ (REMOTE, true ),
+ GET_DEPS_REQ (REMOTE, false),
+ GET_DEPS_RSP (REMOTE, false),
+ COMMIT_MINIMAL_REQ (REMOTE, true ),
+ COMMIT_MAXIMAL_REQ (REMOTE, true ),
+ COMMIT_INVALIDATE_REQ (REMOTE, true ),
+ APPLY_MINIMAL_REQ (REMOTE, true ),
+ APPLY_MAXIMAL_REQ (REMOTE, true ),
+ APPLY_RSP (REMOTE, false),
+ READ_REQ (REMOTE, false),
+ READ_RSP (REMOTE, false),
+ BEGIN_RECOVER_REQ (REMOTE, true ),
+ BEGIN_RECOVER_RSP (REMOTE, false),
+ BEGIN_INVALIDATE_REQ (REMOTE, true ),
+ BEGIN_INVALIDATE_RSP (REMOTE, false),
+ WAIT_ON_COMMIT_REQ (REMOTE, false),
+ WAIT_ON_COMMIT_RSP (REMOTE, false),
+ WAIT_ON_APPLY_REQ (REMOTE, false),
+ INFORM_OF_TXN_REQ (REMOTE, true ),
+ INFORM_DURABLE_REQ (REMOTE, true ),
+ INFORM_HOME_DURABLE_REQ (REMOTE, true ),
+ CHECK_STATUS_REQ (REMOTE, false),
+ CHECK_STATUS_RSP (REMOTE, false),
+ FETCH_DATA_REQ (REMOTE, false),
+ FETCH_DATA_RSP (REMOTE, false),
+ SET_SHARD_DURABLE_REQ (REMOTE, true ),
+ SET_GLOBALLY_DURABLE_REQ (REMOTE, true ),
+ QUERY_DURABLE_BEFORE_REQ (REMOTE, false),
+ QUERY_DURABLE_BEFORE_RSP (REMOTE, false),
+
+ PROPAGATE_PRE_ACCEPT_MSG (LOCAL, true ),
+ PROPAGATE_COMMIT_MSG (LOCAL, true ),
+ PROPAGATE_APPLY_MSG (LOCAL, true ),
+ PROPAGATE_OTHER_MSG (LOCAL, true ),
;
+ public enum Kind { LOCAL, REMOTE }
+
+ /**
+ * LOCAL messages are not sent to remote nodes.
+ */
+ private final Kind kind;
+
/**
* If true, indicates that processing of the message has important side
effects.
*/
- public final boolean hasSideEffects;
+ private final boolean hasSideEffects;
- MessageType(boolean hasSideEffects)
+ MessageType(Kind kind, boolean hasSideEffects)
{
this.hasSideEffects = hasSideEffects;
+ this.kind = kind;
+ }
+
+ public boolean isLocal()
+ {
+ return kind == LOCAL;
+ }
+
+ public boolean isRemote()
+ {
+ return kind == REMOTE;
+ }
+
+ public boolean hasSideEffects()
+ {
+ return hasSideEffects;
}
}
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
new file mode 100644
index 00000000..68a63df1
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.messages;
+
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.coordinate.Infer;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.Node;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.local.Status;
+import accord.primitives.EpochSupplier;
+import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.utils.Invariants;
+import accord.utils.MapReduceConsume;
+
+import javax.annotation.Nullable;
+import java.util.function.BiConsumer;
+
+import static accord.local.Status.NotDefined;
+import static accord.local.Status.Phase.Cleanup;
+import static accord.local.Status.PreApplied;
+import static accord.primitives.Routables.Slice.Minimal;
+
+public class Propagate implements MapReduceConsume<SafeCommandStore, Void>,
EpochSupplier, LocalMessage
+{
+ public static class SerializerSupport
+ {
+ public static Propagate create(TxnId txnId, Route<?> route, SaveStatus
saveStatus, SaveStatus maxSaveStatus, Status.Durability durability, RoutingKey
homeKey, RoutingKey progressKey, Status.Known achieved, PartialTxn partialTxn,
PartialDeps partialDeps, long toEpoch, Timestamp executeAt, Writes writes,
Result result)
+ {
+ return new Propagate(txnId, route, saveStatus, maxSaveStatus,
durability, homeKey, progressKey, achieved, partialTxn, partialDeps, toEpoch,
executeAt, writes, result, null);
+ }
+ }
+
+ public final TxnId txnId;
+ public final Route<?> route;
+ public final SaveStatus saveStatus;
+ public final SaveStatus maxSaveStatus;
+ public final Status.Durability durability;
+ @Nullable public final RoutingKey homeKey;
+ @Nullable public final RoutingKey progressKey;
+ // this is a WHOLE NODE measure, so if commit epoch has more ranges we do
not count as committed if we can only commit in coordination epoch
+ public final Status.Known achieved;
+ @Nullable public final PartialTxn partialTxn;
+ @Nullable public final PartialDeps partialDeps;
+ public final long toEpoch;
+ @Nullable public final Timestamp executeAt;
+ @Nullable public final Writes writes;
+ @Nullable public final Result result;
+
+ transient final BiConsumer<Status.Known, Throwable> callback;
+
+ Propagate(
+ TxnId txnId,
+ Route<?> route,
+ SaveStatus saveStatus,
+ SaveStatus maxSaveStatus,
+ Status.Durability durability,
+ @Nullable RoutingKey homeKey,
+ @Nullable RoutingKey progressKey,
+ Status.Known achieved,
+ @Nullable PartialTxn partialTxn,
+ @Nullable PartialDeps partialDeps,
+ long toEpoch,
+ @Nullable Timestamp executeAt,
+ @Nullable Writes writes,
+ @Nullable Result result,
+ BiConsumer<Status.Known, Throwable> callback)
+ {
+ this.txnId = txnId;
+ this.route = route;
+ this.saveStatus = saveStatus;
+ this.maxSaveStatus = maxSaveStatus;
+ this.durability = durability;
+ this.homeKey = homeKey;
+ this.progressKey = progressKey;
+ this.achieved = achieved;
+ this.partialTxn = partialTxn;
+ this.partialDeps = partialDeps;
+ this.toEpoch = toEpoch;
+ this.executeAt = executeAt;
+ this.writes = writes;
+ this.result = result;
+ this.callback = callback;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static void propagate(Node node, TxnId txnId, long sourceEpoch,
CheckStatus.WithQuorum withQuorum, Route route, @Nullable Status.Known target,
CheckStatus.CheckStatusOkFull full, BiConsumer<Status.Known, Throwable>
callback)
+ {
+ if (full.saveStatus.status == NotDefined && full.invalidIfNotAtLeast
== NotDefined)
+ {
+ callback.accept(Status.Known.Nothing, null);
+ return;
+ }
+
+ Invariants.checkState(sourceEpoch == txnId.epoch() || (full.executeAt
!= null && sourceEpoch == full.executeAt.epoch()));
+ Route<?> maxRoute = Route.merge(route, full.route);
+
+ // TODO (required): permit individual shards that are behind to catch
up by themselves
+ long toEpoch = sourceEpoch;
+ Ranges sliceRanges =
node.topology().localRangesForEpochs(txnId.epoch(), toEpoch);
+ if (!maxRoute.covers(sliceRanges))
+ {
+ callback.accept(Status.Known.Nothing, null);
+ return;
+ }
+
+ RoutingKey progressKey = node.trySelectProgressKey(txnId, maxRoute);
+
+ Ranges covering = maxRoute.sliceCovering(sliceRanges, Minimal);
+ Participants<?> participatingKeys =
maxRoute.participants().slice(covering, Minimal);
+ Status.Known achieved = full.sufficientFor(participatingKeys,
withQuorum);
+ if (achieved.executeAt.hasDecidedExecuteAt() && full.executeAt.epoch()
> toEpoch)
+ {
+ Ranges acceptRanges;
+ if (!node.topology().hasEpoch(full.executeAt.epoch()) ||
+ (!maxRoute.covers(acceptRanges =
node.topology().localRangesForEpochs(txnId.epoch(), full.executeAt.epoch()))))
+ {
+ // we don't know what the execution epoch requires, so we
cannot be sure we can replicate it locally
+ // we *could* wait until we have the local epoch before
running this
+ Status.Outcome outcome =
achieved.outcome.propagatesBetweenShards() ? achieved.outcome :
Status.Outcome.Unknown;
+ achieved = new Status.Known(achieved.definition,
achieved.executeAt, Status.KnownDeps.DepsUnknown, outcome);
+ }
+ else
+ {
+ // TODO (expected): this should only be the two precise
epochs, not the full range of epochs
+ sliceRanges = acceptRanges;
+ covering = maxRoute.sliceCovering(sliceRanges, Minimal);
+ participatingKeys = maxRoute.participants().slice(covering,
Minimal);
+ Status.Known knownForExecution =
full.sufficientFor(participatingKeys, withQuorum);
+ if ((target != null &&
target.isSatisfiedBy(knownForExecution)) ||
knownForExecution.isSatisfiedBy(achieved))
+ {
+ achieved = knownForExecution;
+ toEpoch = full.executeAt.epoch();
+ }
+ else
+ {
+ Invariants.checkState(sourceEpoch == txnId.epoch(), "%d !=
%d", sourceEpoch, txnId.epoch());
+ achieved = new Status.Known(achieved.definition,
achieved.executeAt, knownForExecution.deps, knownForExecution.outcome);
+ }
+ }
+ }
+
+ PartialTxn partialTxn = null;
+ if (achieved.definition.isKnown())
+ partialTxn = full.partialTxn.slice(sliceRanges,
true).reconstitutePartial(covering);
+
+ PartialDeps partialDeps = null;
+ if (achieved.deps.hasDecidedDeps())
+ partialDeps =
full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
+
+ Propagate propagate =
+ new Propagate(txnId, maxRoute, full.saveStatus,
full.maxSaveStatus, full.durability, full.homeKey, progressKey, achieved,
partialTxn, partialDeps, toEpoch, full.executeAt, full.writes, full.result,
callback);
+
+ node.localMessage(propagate);
+ }
+
+ @Override
+ public TxnId primaryTxnId()
+ {
+ return txnId;
+ }
+
+ @Override
+ public Seekables<?, ?> keys()
+ {
+ if (achieved.definition.isKnown())
+ return partialTxn.keys();
+ else if (achieved.deps.hasProposedOrDecidedDeps())
+ return partialDeps.keyDeps.keys();
+ else
+ return Keys.EMPTY;
+ }
+
+ @Override
+ public void process(Node node)
+ {
+ node.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
+ }
+
+ @Override
+ public Void apply(SafeCommandStore safeStore)
+ {
+ SafeCommand safeCommand = safeStore.get(txnId, this, route);
+ Command command = safeCommand.current();
+ if (command.saveStatus().phase.compareTo(Status.Phase.Persist) >= 0)
+ return null;
+
+ Status propagate = achieved.propagate();
+ if (command.hasBeen(propagate))
+ {
+ if (maxSaveStatus.phase == Cleanup &&
durability.isDurableOrInvalidated() && Infer.safeToCleanup(safeStore, command,
route, executeAt))
+ Commands.setTruncatedApply(safeStore, safeCommand);
+ return null;
+ }
+
+ switch (propagate)
+ {
+ default: throw new IllegalStateException("Unexpected status: " +
propagate);
+ case Accepted:
+ case AcceptedInvalidate:
+ // we never "propagate" accepted statuses as these are
essentially votes,
+ // and contribute nothing to our local state machine
+ throw new IllegalStateException("Invalid states to propagate:
" + achieved.propagate());
+
+ case Truncated:
+ // if our peers have truncated this command, then either:
+ // 1) we have already applied it locally; 2) the command
doesn't apply locally; 3) we are stale; or 4) the command is invalidated
+ if (command.hasBeen(PreApplied) ||
command.saveStatus().isUninitialised())
+ break;
+
+ if (Infer.safeToCleanup(safeStore, command, route, executeAt))
+ {
+ Commands.setErased(safeStore, safeCommand);
+ break;
+ }
+
+ // TODO (required): check if we are stale
+ // otherwise we are either stale, or the command didn't reach
consensus
+
+ case Invalidated:
+ Commands.commitInvalidate(safeStore, safeCommand, route);
+ break;
+
+ case Applied:
+ case PreApplied:
+ Invariants.checkState(executeAt != null);
+ if (toEpoch >= executeAt.epoch())
+ {
+ confirm(Commands.apply(safeStore, safeCommand, txnId,
route, progressKey, executeAt, partialDeps, partialTxn, writes, result));
+ break;
+ }
+
+ case Committed:
+ case ReadyToExecute:
+ confirm(Commands.commit(safeStore, safeCommand, txnId, route,
progressKey, partialTxn, executeAt, partialDeps));
+ break;
+
+ case PreCommitted:
+ Commands.precommit(safeStore, safeCommand, txnId, executeAt,
route);
+ if (!achieved.definition.isKnown())
+ break;
+
+ case PreAccepted:
+ // only preaccept if we coordinate the transaction
+ if (safeStore.ranges().coordinates(txnId).intersects(route) &&
Route.isFullRoute(route))
+ Commands.preaccept(safeStore, safeCommand, txnId,
txnId.epoch(), partialTxn, Route.castToFullRoute(route), progressKey);
+ break;
+
+ case NotDefined:
+ break;
+ }
+
+
+ if (!durability.isDurable() || homeKey == null)
+ return null;
+
+ if (!safeStore.ranges().coordinates(txnId).contains(homeKey))
+ return null;
+
+ Timestamp executeAt = saveStatus.known.executeAt.hasDecidedExecuteAt()
? this.executeAt : null;
+ Commands.setDurability(safeStore, safeCommand, durability, route,
executeAt);
+ return null;
+ }
+
+ @Override
+ public Void reduce(Void o1, Void o2)
+ {
+ return null;
+ }
+
+ @Override
+ public void accept(Void result, Throwable failure)
+ {
+ if (null != callback)
+ callback.accept(failure == null ? achieved : null, failure);
+ }
+
+ @Override
+ public MessageType type()
+ {
+ switch (achieved.propagate())
+ {
+ case Applied:
+ case PreApplied:
+ if (toEpoch >= executeAt.epoch())
+ return MessageType.PROPAGATE_APPLY_MSG;
+ case Committed:
+ case ReadyToExecute:
+ return MessageType.PROPAGATE_COMMIT_MSG;
+ case PreCommitted:
+ if (!achieved.definition.isKnown())
+ return MessageType.PROPAGATE_OTHER_MSG;
+ case PreAccepted:
+ return MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+ default:
+ return MessageType.PROPAGATE_OTHER_MSG;
+ }
+ }
+
+ @Override
+ public long epoch()
+ {
+ return toEpoch;
+ }
+
+ private static void confirm(Commands.CommitOutcome outcome)
+ {
+ switch (outcome)
+ {
+ default: throw new IllegalStateException("Unknown outcome: " +
outcome);
+ case Redundant:
+ case Success:
+ return;
+ case Insufficient: throw new IllegalStateException("Should have
enough information");
+ }
+ }
+
+ private static void confirm(Commands.ApplyOutcome outcome)
+ {
+ switch (outcome)
+ {
+ default: throw new IllegalStateException("Unknown outcome: " +
outcome);
+ case Redundant:
+ case Success:
+ return;
+ case Insufficient: throw new IllegalStateException("Should have
enough information");
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Propagate{type:" + type() +
+ ", txnId: " + txnId +
+ ", saveStatus: " + saveStatus +
+ ", deps: " + partialDeps +
+ ", txn: " + partialTxn +
+ ", executeAt: " + executeAt +
+ ", writes:" + writes +
+ ", result:" + result +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java
b/accord-core/src/main/java/accord/primitives/Deps.java
index 7a899fae..69476ed2 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -268,7 +268,7 @@ public class Deps
public boolean equals(Deps that)
{
- return this.keyDeps.equals(that.keyDeps) &&
this.rangeDeps.equals(that.rangeDeps);
+ return that != null && this.keyDeps.equals(that.keyDeps) &&
this.rangeDeps.equals(that.rangeDeps);
}
public @Nullable TxnId maxTxnId()
diff --git a/accord-core/src/main/java/accord/primitives/PartialDeps.java
b/accord-core/src/main/java/accord/primitives/PartialDeps.java
index a626baf2..f3770062 100644
--- a/accord-core/src/main/java/accord/primitives/PartialDeps.java
+++ b/accord-core/src/main/java/accord/primitives/PartialDeps.java
@@ -59,6 +59,13 @@ public class PartialDeps extends Deps
return covering.containsAll(participants);
}
+ public boolean isEqualOrFuller(PartialDeps that)
+ {
+ return covering.containsAll(that.covering)
+ && keyDeps.txnIds().containsAll(that.keyDeps.txnIds())
+ && rangeDeps.txnIds().containsAll(that.rangeDeps.txnIds());
+ }
+
public PartialDeps with(PartialDeps that)
{
Invariants.checkArgument((this.rangeDeps == null) == (that.rangeDeps
== null));
@@ -100,7 +107,7 @@ public class PartialDeps extends Deps
public boolean equals(PartialDeps that)
{
- return this.covering.equals(that.covering) && super.equals(that);
+ return that != null && this.covering.equals(that.covering) &&
super.equals(that);
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java
b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index 36bff360..262b3d6e 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -18,12 +18,14 @@
package accord.primitives;
-import javax.annotation.Nullable;
+import java.util.Objects;
import accord.api.Query;
import accord.api.Read;
import accord.api.Update;
+import javax.annotation.Nullable;
+
public interface PartialTxn extends Txn
{
// TODO (expected): we no longer need this if everyone has a FullRoute
@@ -55,6 +57,8 @@ public interface PartialTxn extends Txn
return covering().containsAll(participants);
}
+ boolean isEqualOrFuller(PartialTxn txn);
+
static PartialTxn merge(@Nullable PartialTxn a, @Nullable PartialTxn b)
{
return a == null ? b : b == null ? a : a.with(b);
@@ -106,6 +110,17 @@ public interface PartialTxn extends Txn
return covering.containsAll(ranges);
}
+ @Override
+ public boolean isEqualOrFuller(PartialTxn txn)
+ {
+ return kind() == txn.kind()
+ && covering().containsAll(txn.covering())
+ && keys().containsAll(txn.keys())
+ && read().isEqualOrFuller(txn.read())
+ && Objects.equals(query(), txn.query())
+ && ((update() == null && txn.update() == null) || (update() !=
null && txn.update() != null && update().isEqualOrFuller(txn.update())));
+ }
+
@Override
public Txn reconstitute(FullRoute<?> route)
{
@@ -127,5 +142,4 @@ public interface PartialTxn extends Txn
return new PartialTxn.InMemory(covering, kind(), keys(), read(),
query(), update());
}
}
-
}
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java
b/accord-core/src/main/java/accord/primitives/Writes.java
index fb908abe..08fc9442 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -23,6 +23,7 @@ import accord.local.SafeCommandStore;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -33,9 +34,9 @@ public class Writes
public final TxnId txnId;
public final Timestamp executeAt;
public final Seekables<?, ?> keys;
- public final Write write;
+ @Nullable public final Write write;
- public Writes(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys,
Write write)
+ public Writes(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys,
@Nullable Write write)
{
this.txnId = txnId;
this.executeAt = executeAt;
@@ -49,7 +50,7 @@ public class Writes
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Writes writes = (Writes) o;
- return executeAt.equals(writes.executeAt) && keys.equals(writes.keys)
&& write.equals(writes.write);
+ return executeAt.equals(writes.executeAt) && keys.equals(writes.keys)
&& Objects.equals(write, writes.write);
}
public boolean isEmpty()
diff --git a/accord-core/src/test/java/accord/Utils.java
b/accord-core/src/test/java/accord/Utils.java
index ec1e81e0..77a8bd81 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -38,6 +38,7 @@ import accord.impl.mock.MockStore;
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.messages.LocalMessage;
import accord.primitives.Keys;
import accord.primitives.Range;
import accord.primitives.Ranges;
@@ -141,6 +142,7 @@ public class Utils
Scheduler scheduler = new ThreadPoolScheduler();
Node node = new Node(nodeId,
messageSink,
+ LocalMessage::process,
new MockConfigurationService(messageSink,
EpochFunction.noop(), topology),
clock,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 853dfe71..a7eff1cd 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -55,6 +55,7 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.messages.LocalMessage;
import accord.messages.MessageType;
import accord.messages.Message;
import accord.messages.Reply;
@@ -228,7 +229,7 @@ public class Cluster implements Scheduler
MessageSink messageSink = sinks.create(id,
randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
BurnTestConfigurationService configService = new
BurnTestConfigurationService(id, executor, randomSupplier, topology,
lookup::get, topologyUpdates);
- Node node = new Node(id, messageSink, configService,
nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
+ Node node = new Node(id, messageSink, LocalMessage::process,
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS,
nowSupplier),
() -> new ListStore(id), new
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
executor.agent(),
randomSupplier.get(), sinks,
SizeOfIntersectionSorter.SUPPLIER,
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 4d04d512..fdd8a91b 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -46,6 +46,7 @@ import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.messages.Callback;
+import accord.messages.LocalMessage;
import accord.messages.Reply;
import accord.messages.Request;
import accord.messages.SafeCallback;
@@ -120,6 +121,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
MockConfigurationService configurationService = new
MockConfigurationService(messageSink, onFetchTopology, topology);
Node node = new Node(id,
messageSink,
+ LocalMessage::process,
configurationService,
nowSupplier,
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 365ceec9..15061ea5 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -109,7 +109,7 @@ public class ImmutableCommandTest
private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
MockCluster.Clock clock = new MockCluster.Clock(100);
- Node node = new Node(id, null, new MockConfigurationService(null,
(epoch, service) -> { }, storeSupport.local.get()),
+ Node node = new Node(id, null, null, new
MockConfigurationService(null, (epoch, service) -> { },
storeSupport.local.get()),
clock,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
() -> storeSupport.data, new
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index f988703e..1af7d4c2 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -50,6 +50,7 @@ import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.messages.Callback;
+import accord.messages.LocalMessage;
import accord.messages.Reply;
import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
@@ -313,7 +314,7 @@ public class Cluster implements Scheduler
{
MessageSink messageSink = sinks.create(node,
randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
- lookup.put(node, new Node(node, messageSink, new
SimpleConfigService(topology),
+ lookup.put(node, new Node(node, messageSink,
LocalMessage::process, new SimpleConfigService(topology),
nowSupplier,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE,
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index a1919045..871123a3 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -43,6 +43,7 @@ import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.maelstrom.Packet.Type;
import accord.messages.Callback;
+import accord.messages.LocalMessage;
import accord.messages.Reply;
import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
@@ -172,7 +173,7 @@ public class Main
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start,
init.self, out, err);
- on = new Node(init.self, sink, new SimpleConfigService(topology),
+ on = new Node(init.self, sink, LocalMessage::process, new
SimpleConfigService(topology),
System::currentTimeMillis,
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(),
scheduler, SizeOfIntersectionSorter.SUPPLIER,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]